kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6554) Broker doesn't reject Produce request with inconsistent state
Date Tue, 20 Feb 2018 09:33:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16369845#comment-16369845
] 

ASF GitHub Bot commented on KAFKA-6554:
---------------------------------------

rajinisivaram closed pull request #4585: KAFKA-6554; Missing lastOffsetDelta validation before
log append
URL: https://github.com/apache/kafka/pull/4585
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index ff8d3b990ba..71e668e45da 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -106,7 +106,7 @@
     static final int CRC_LENGTH = 4;
     static final int ATTRIBUTES_OFFSET = CRC_OFFSET + CRC_LENGTH;
     static final int ATTRIBUTE_LENGTH = 2;
-    static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+    public static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
     static final int LAST_OFFSET_DELTA_LENGTH = 4;
     static final int FIRST_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
     static final int FIRST_TIMESTAMP_LENGTH = 8;
@@ -118,7 +118,7 @@
     static final int PRODUCER_EPOCH_LENGTH = 2;
     static final int BASE_SEQUENCE_OFFSET = PRODUCER_EPOCH_OFFSET + PRODUCER_EPOCH_LENGTH;
     static final int BASE_SEQUENCE_LENGTH = 4;
-    static final int RECORDS_COUNT_OFFSET = BASE_SEQUENCE_OFFSET + BASE_SEQUENCE_LENGTH;
+    public static final int RECORDS_COUNT_OFFSET = BASE_SEQUENCE_OFFSET + BASE_SEQUENCE_LENGTH;
     static final int RECORDS_COUNT_LENGTH = 4;
     static final int RECORDS_OFFSET = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH;
     public static final int RECORD_BATCH_OVERHEAD = RECORDS_OFFSET;
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 15750e9cd06..1beb2bdda37 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -74,15 +74,27 @@ private[kafka] object LogValidator extends Logging {
 
   private def validateBatch(batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit
= {
     if (isFromClient) {
+      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
+        val countFromOffsets = batch.lastOffset - batch.baseOffset + 1
+        if (countFromOffsets <= 0)
+          throw new InvalidRecordException(s"Batch has an invalid offset range: [${batch.baseOffset},
${batch.lastOffset}]")
+
+        // v2 and above messages always have a non-null count
+        val count = batch.countOrNull
+        if (count <= 0)
+          throw new InvalidRecordException(s"Invalid reported count for record batch: $count")
+
+        if (countFromOffsets != batch.countOrNull)
+          throw new InvalidRecordException(s"Inconsistent batch offset range [${batch.baseOffset},
${batch.lastOffset}] " +
+            s"and count of records $count")
+      }
+
       if (batch.hasProducerId && batch.baseSequence < 0)
         throw new InvalidRecordException(s"Invalid sequence number ${batch.baseSequence}
in record batch " +
           s"with producerId ${batch.producerId}")
 
       if (batch.isControlBatch)
         throw new InvalidRecordException("Clients are not allowed to write control records")
-
-      if (Option(batch.countOrNull).contains(0))
-        throw new InvalidRecordException("Record batches must contain at least one record")
     }
 
     if (batch.isTransactional && toMagic < RecordBatch.MAGIC_VALUE_V2)
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 131152af430..04e89528b12 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.test.TestUtils
 import org.junit.Assert._
 import org.junit.Test
+import org.scalatest.Assertions.intercept
 
 import scala.collection.JavaConverters._
 
@@ -147,6 +148,50 @@ class LogValidatorTest {
       compressed = true)
   }
 
+  @Test
+  def testInvalidOffsetRangeAndRecordCount(): Unit = {
+    // The batch to be written contains 3 records, so the correct lastOffsetDelta is 2
+    validateRecordBatchWithCountOverrides(lastOffsetDelta = 2, count = 3)
+
+    // Count and offset range are inconsistent or invalid
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 0, count = 3)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 15, count = 3)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = -3, count = 3)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 2, count = -3)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 2, count = 6)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 2, count = 0)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = -3, count = -2)
+
+    // Count and offset range are consistent, but do not match the actual number of records
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 5, count = 6)
+    assertInvalidBatchCountOverrides(lastOffsetDelta = 1, count = 2)
+  }
+
+  private def assertInvalidBatchCountOverrides(lastOffsetDelta: Int, count: Int): Unit =
{
+    intercept[InvalidRecordException] {
+      validateRecordBatchWithCountOverrides(lastOffsetDelta, count)
+    }
+  }
+
+  private def validateRecordBatchWithCountOverrides(lastOffsetDelta: Int, count: Int) {
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = 1234L,
codec = CompressionType.NONE)
+    records.buffer.putInt(DefaultRecordBatch.RECORDS_COUNT_OFFSET, count)
+    records.buffer.putInt(DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta)
+    LogValidator.validateMessagesAndAssignOffsets(
+      records,
+      offsetCounter = new LongRef(0),
+      time = time,
+      now = time.milliseconds(),
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V2,
+      timestampType = TimestampType.LOG_APPEND_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
+      isFromClient = true)
+  }
+
   @Test
   def testLogAppendTimeWithoutRecompressionV2() {
     checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V2)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Broker doesn't reject Produce request with inconsistent state
> -------------------------------------------------------------
>
>                 Key: KAFKA-6554
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6554
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 1.0.0
>            Reporter: Simon Fell
>            Assignee: Jason Gustafson
>            Priority: Minor
>         Attachments: produce_v3.txt
>
>
> Produce messages of type v3 have offset deltas in each record along with a LastOffsetDelta
for the topic/partition set. In investigating an issue with missing offsets, I found a bug
in a producer library where it would send multiple records, but leave LastOffsetDelta at 0.
This causes various problems including holes in the offsets fetched by the consumer. 
> As lastOffsetDelta can be computed by looking at the records, it seems like the broker
should at least validate the LastOffsetDelta field against the contained records to stop this
bad data getting in.
> I've attached a decode v3 produce message that was causing the problems, and was accepted
by the broker.
> Here's a link to the issue in the kafka library we were using which has more context
if you need it.
> https://github.com/Shopify/sarama/issues/1032
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message