kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-6003; Accept appends on replicas and when rebuilding the log unconditionally
Date Mon, 09 Oct 2017 17:34:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 ab554caee -> 3c96f9128


KAFKA-6003; Accept appends on replicas and when rebuilding the log unconditionally

This is a port of #4004 for the 0.11.0 branch.

With this patch so that we _only_ validate appends which originate
from the client. In general, once the append is validated and written to
the leader the first time, revalidating it is undesirable since we can't
do anything if validation fails, and also because it is hard to maintain
the correct assumptions during validation, leading to spurious
validation failures.

For example, when we have compacted topics, it is possible for batches
to be compacted on the follower but not on the leader. This case would
also lead to an OutOfOrderSequencException during replication. The same
applies to when we rebuild state from compacted topics: we would get
gaps in the sequence numbers, causing the OutOfOrderSequence.

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #4020 from apurvam/KAKFA-6003-0.11.0-handle-unknown-producer-on-replica


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3c96f912
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3c96f912
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3c96f912

Branch: refs/heads/0.11.0
Commit: 3c96f9128af8ddc5d50c64af380827f7a0038287
Parents: ab554ca
Author: Apurva Mehta <apurva@confluent.io>
Authored: Mon Oct 9 10:34:43 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon Oct 9 10:34:43 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         |  8 +--
 core/src/main/scala/kafka/log/LogSegment.scala  |  2 +-
 .../scala/kafka/log/ProducerStateManager.scala  | 25 ++++----
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  2 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 15 +++--
 .../kafka/log/ProducerStateManagerTest.scala    | 62 +++++++++++++++++---
 6 files changed, 84 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index f8b5d82..170e9cb 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -501,7 +501,7 @@ class Log(@volatile var dir: File,
     val completedTxns = ListBuffer.empty[CompletedTxn]
     records.batches.asScala.foreach { batch =>
       if (batch.hasProducerId) {
-        val maybeCompletedTxn = updateProducers(batch, loadedProducers, loadingFromLog =
true)
+        val maybeCompletedTxn = updateProducers(batch, loadedProducers, isFromClient = false)
         maybeCompletedTxn.foreach(completedTxns += _)
       }
     }
@@ -762,7 +762,7 @@ class Log(@volatile var dir: File,
       if (isFromClient && maybeLastEntry.exists(_.isDuplicate(batch)))
         return (updatedProducers, completedTxns.toList, maybeLastEntry)
 
-      val maybeCompletedTxn = updateProducers(batch, updatedProducers, loadingFromLog = false)
+      val maybeCompletedTxn = updateProducers(batch, updatedProducers, isFromClient = isFromClient)
       maybeCompletedTxn.foreach(completedTxns += _)
     }
     (updatedProducers, completedTxns.toList, None)
@@ -849,9 +849,9 @@ class Log(@volatile var dir: File,
 
   private def updateProducers(batch: RecordBatch,
                               producers: mutable.Map[Long, ProducerAppendInfo],
-                              loadingFromLog: Boolean): Option[CompletedTxn] = {
+                              isFromClient: Boolean): Option[CompletedTxn] = {
     val producerId = batch.producerId
-    val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId,
loadingFromLog))
+    val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId,
isFromClient))
     appendInfo.append(batch)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 4e0834c..53eafa1 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -147,7 +147,7 @@ class LogSegment(val log: FileRecords,
   private def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch):
Unit = {
     if (batch.hasProducerId) {
       val producerId = batch.producerId
-      val appendInfo = producerStateManager.prepareUpdate(producerId, loadingFromLog = true)
+      val appendInfo = producerStateManager.prepareUpdate(producerId, isFromClient = false)
       val maybeCompletedTxn = appendInfo.append(batch)
       producerStateManager.update(appendInfo)
       maybeCompletedTxn.foreach { completedTxn =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 7a1962a..24530da 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -91,16 +91,17 @@ private[log] case class ProducerIdEntry(producerId: Long, producerEpoch:
Short,
  *                                of this is the consumer offsets topic which uses producer
ids from incoming
  *                                TxnOffsetCommit, but has no sequence number to validate
and does not depend
  *                                on the deduplication which sequence numbers provide.
- * @param loadingFromLog This parameter indicates whether the new append is being loaded
directly from the log.
- *                       This is used to repopulate producer state when the broker is initialized.
The only
- *                       difference in behavior is that we do not validate the sequence number
of the first append
- *                       since we may have lost previous sequence numbers when segments were
removed due to log
- *                       retention enforcement.
+ * @param isFromClient The parameter indicates whether the write is coming from a client
or not. If it is not coming
+ *                     from a client, it could be due to replication traffic, or when rebuilding
producer state on
+ *                     from the log. In the latter two cases, we should not validate the
append, but accept the
+ *                     incoming append unconditionally. This is for two reasons: first, the
write was already
+ *                     validated when received from the client. Second, the data is already
the log, so it is not
+ *                     clear what would be achieved by validating it again.
  */
 private[log] class ProducerAppendInfo(val producerId: Long,
                                       initialEntry: ProducerIdEntry,
                                       validateSequenceNumbers: Boolean,
-                                      loadingFromLog: Boolean) {
+                                      isFromClient: Boolean) {
   private var producerEpoch = initialEntry.producerEpoch
   private var firstSeq = initialEntry.firstSeq
   private var lastSeq = initialEntry.lastSeq
@@ -108,6 +109,7 @@ private[log] class ProducerAppendInfo(val producerId: Long,
   private var maxTimestamp = initialEntry.timestamp
   private var currentTxnFirstOffset = initialEntry.currentTxnFirstOffset
   private var coordinatorEpoch = initialEntry.coordinatorEpoch
+
   private val transactions = ListBuffer.empty[TxnMetadata]
 
   private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int) = {
@@ -161,9 +163,10 @@ private[log] class ProducerAppendInfo(val producerId: Long,
              lastTimestamp: Long,
              lastOffset: Long,
              isTransactional: Boolean): Unit = {
-    if (epoch != RecordBatch.NO_PRODUCER_EPOCH && !loadingFromLog)
-      // skip validation if this is the first entry when loading from the log. Log retention
-      // will generally have removed the beginning entries from each producer id
+    if (isFromClient)
+      // We should only validate appends coming from the client. In particular, this means
that we don't validate
+      // appends for sequence numbers and epochs when building producer state from the log
or for writes on a replica.
+      // So validation only happens on the first write from the client to the partition leader.
       validateAppend(epoch, firstSeq, lastSeq)
 
     this.producerEpoch = epoch
@@ -507,9 +510,9 @@ class ProducerStateManager(val topicPartition: TopicPartition,
     }
   }
 
-  def prepareUpdate(producerId: Long, loadingFromLog: Boolean): ProducerAppendInfo =
+  def prepareUpdate(producerId: Long, isFromClient: Boolean): ProducerAppendInfo =
     new ProducerAppendInfo(producerId, lastEntry(producerId).getOrElse(ProducerIdEntry.Empty),
validateSequenceNumbers,
-      loadingFromLog)
+      isFromClient)
 
   /**
    * Update the mapping with the given append information

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 79fe220..6ef5e19 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -274,7 +274,7 @@ class LogSegmentTest {
     val segment = createSegment(100)
     val producerEpoch = 0.toShort
     val partitionLeaderEpoch = 15
-    val sequence = 0
+    val sequence = 100
 
     val pid1 = 5L
     val pid2 = 10L

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 3c80bff..d2d1d6c 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -891,22 +891,27 @@ class LogTest {
     }
   }
 
-  @Test(expected = classOf[DuplicateSequenceNumberException])
+  @Test
   def testDuplicateAppendToFollower() : Unit = {
     val log = createLog(1024*1024)
     val epoch: Short = 0
     val pid = 1L
     val baseSequence = 0
     val partitionLeaderEpoch = 0
+    // The point of this test is to ensure that validation isn't performed on the follower.
     // this is a bit contrived. to trigger the duplicate case for a follower append, we have
to append
     // a batch with matching sequence numbers, but valid increasing offsets
+    assertEquals(0L, log.logEndOffset)
     log.appendAsFollower(MemoryRecords.withIdempotentRecords(0L, CompressionType.NONE, pid,
epoch, baseSequence,
       partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
     log.appendAsFollower(MemoryRecords.withIdempotentRecords(2L, CompressionType.NONE, pid,
epoch, baseSequence,
       partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+    // Ensure that even the duplicate sequences are accepted on the follower.
+    assertEquals(4L, log.logEndOffset)
   }
 
-  @Test(expected = classOf[DuplicateSequenceNumberException])
+  @Test
   def testMultipleProducersWithDuplicatesInSingleAppend() : Unit = {
     val log = createLog(1024*1024)
 
@@ -950,9 +955,11 @@ class LogTest {
 
     val records = MemoryRecords.readableRecords(buffer)
     records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
+
+    // Ensure that batches with duplicates are accepted on the follower.
+    assertEquals(0L, log.logEndOffset)
     log.appendAsFollower(records)
-    // Should throw a duplicate sequence exception here.
-    fail("should have thrown a DuplicateSequenceNumberException.")
+    assertEquals(5L, log.logEndOffset)
   }
 
   @Test(expected = classOf[ProducerFencedException])

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 9a324aa..3c7c07b 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -86,7 +86,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 15.toShort
     val sequence = Int.MaxValue
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true)
+    append(stateManager, producerId, epoch, sequence, offset, isFromClient = false)
 
     append(stateManager, producerId, epoch, 0, offset + 500)
 
@@ -104,7 +104,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 15.toShort
     val sequence = Int.MaxValue
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true)
+    append(stateManager, producerId, epoch, sequence, offset, isFromClient = false)
     append(stateManager, producerId, epoch, 1, offset + 500)
   }
 
@@ -113,7 +113,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val epoch = 5.toShort
     val sequence = 16
     val offset = 735L
-    append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true)
+    append(stateManager, producerId, epoch, sequence, offset, isFromClient = false)
 
     val maybeLastEntry = stateManager.lastEntry(producerId)
     assertTrue(maybeLastEntry.isDefined)
@@ -159,7 +159,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val offset = 992342L
     val seq = 0
     val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.Empty, validateSequenceNumbers
= true,
-      loadingFromLog = false)
+      isFromClient = true)
     producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional
= true)
 
     val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, segmentBaseOffset
= 990000L,
@@ -176,7 +176,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val offset = 992342L
     val seq = 0
     val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.Empty, validateSequenceNumbers
= true,
-      loadingFromLog = false)
+      isFromClient = true)
     producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional
= true)
 
     // use some other offset to simulate a follower append where the log offset metadata
won't typically
@@ -196,7 +196,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val offset = 9L
     append(stateManager, producerId, producerEpoch, 0, offset)
 
-    val appendInfo = stateManager.prepareUpdate(producerId, loadingFromLog = false)
+    val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
     appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 20L, isTransactional = true)
     var lastEntry = appendInfo.lastEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
@@ -319,6 +319,50 @@ class ProducerStateManagerTest extends JUnitSuite {
   }
 
   @Test
+  def testAcceptAppendWithoutProducerStateOnReplica(): Unit = {
+    val epoch = 0.toShort
+    append(stateManager, producerId, epoch, 0, 0L, 0)
+    append(stateManager, producerId, epoch, 1, 1L, 1)
+
+    stateManager.takeSnapshot()
+    val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
+    recoveredMapping.truncateAndReload(0L, 1L, 70000)
+
+    val sequence = 2
+    // entry added after recovery. The pid should be expired now, and would not exist in
the pid mapping. Nonetheless
+    // the append on a replica should be accepted with the local producer state updated to
the appended value.
+    assertFalse(recoveredMapping.activeProducers.contains(producerId))
+    append(recoveredMapping, producerId, epoch, sequence, 2L, 70001, isFromClient = false)
+    assertTrue(recoveredMapping.activeProducers.contains(producerId))
+    val producerIdEntry = recoveredMapping.activeProducers.get(producerId).head
+    assertEquals(epoch, producerIdEntry.producerEpoch)
+    assertEquals(sequence, producerIdEntry.firstSeq)
+    assertEquals(sequence, producerIdEntry.lastSeq)
+  }
+
+  @Test
+  def testAcceptAppendWithSequenceGapsOnReplica(): Unit = {
+    val epoch = 0.toShort
+    append(stateManager, producerId, epoch, 0, 0L, 0)
+    val outOfOrderSequence = 3
+
+    // First we ensure that we raise an OutOfOrderSequenceException is raised when the append
comes from a client.
+    try {
+      append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, isFromClient = true)
+      fail("Expected an OutOfOrderSequenceException to be raised.")
+    } catch {
+      case _ : OutOfOrderSequenceException =>
+      // Good!
+      case _ : Exception =>
+        fail("Expected an OutOfOrderSequenceException to be raised.")
+    }
+
+    assertEquals(0L, stateManager.activeProducers(producerId).lastSeq)
+    append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, isFromClient = false)
+    assertEquals(outOfOrderSequence, stateManager.activeProducers(producerId).lastSeq)
+  }
+
+  @Test
   def testDeleteSnapshotsBefore(): Unit = {
     val epoch = 0.toShort
     append(stateManager, producerId, epoch, 0, 0L)
@@ -673,7 +717,7 @@ class ProducerStateManagerTest extends JUnitSuite {
                                  offset: Long,
                                  coordinatorEpoch: Int = 0,
                                  timestamp: Long = time.milliseconds()): (CompletedTxn, Long)
= {
-    val producerAppendInfo = stateManager.prepareUpdate(producerId, loadingFromLog = false)
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient = true)
     val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch)
     val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch,
offset, timestamp)
     mapping.update(producerAppendInfo)
@@ -689,8 +733,8 @@ class ProducerStateManagerTest extends JUnitSuite {
                      offset: Long,
                      timestamp: Long = time.milliseconds(),
                      isTransactional: Boolean = false,
-                     isLoadingFromLog: Boolean = false): Unit = {
-    val producerAppendInfo = stateManager.prepareUpdate(producerId, isLoadingFromLog)
+                     isFromClient : Boolean = true): Unit = {
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient)
     producerAppendInfo.append(producerEpoch, seq, seq, timestamp, offset, isTransactional)
     stateManager.update(producerAppendInfo)
     stateManager.updateMapEndOffset(offset + 1)


Mime
View raw message