kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/4] kafka git commit: KAFKA-4817; Add idempotent producer semantics
Date Mon, 03 Apr 2017 02:41:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1ce6aa550 -> bdf4cba04


http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 768c073..a7af24e 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -28,8 +28,7 @@ import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.utils._
 import kafka.server.KafkaConfig
-import org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP
-import org.apache.kafka.common.record.{RecordBatch, _}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
@@ -69,7 +68,7 @@ class LogTest extends JUnitSuite {
     val set = TestUtils.singletonRecords("test".getBytes)
 
     val logProps = new Properties()
-    logProps.put(LogConfig.SegmentMsProp, (1 * 60 * 60L): java.lang.Long)
+    logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60L: java.lang.Long)
     logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
 
     // create a log
@@ -77,6 +76,7 @@ class LogTest extends JUnitSuite {
                       LogConfig(logProps),
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
+                      maxPidExpirationMs = 24 * 60,
                       scheduler = time.scheduler,
                       time = time)
     assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
@@ -120,6 +120,219 @@ class LogTest extends JUnitSuite {
     assertEquals("Appending an empty message set should not roll log even if sufficient time has passed.", numSegments, log.numberOfSegments)
   }
 
+  @Test(expected = classOf[OutOfOrderSequenceException])
+  def testNonSequentialAppend(): Unit = {
+    val logProps = new Properties()
+
+    // create a log
+    val log = new Log(logDir,
+      LogConfig(logProps),
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      time = time)
+
+    val pid = 1L
+    val epoch: Short = 0
+
+    val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = epoch, sequence = 0)
+    log.append(records, assignOffsets = true)
+
+    val nextRecords = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = epoch, sequence = 2)
+    log.append(nextRecords, assignOffsets = true)
+  }
+
+  @Test
+  def testDuplicateAppends(): Unit = {
+    val logProps = new Properties()
+
+    // create a log
+    val log = new Log(logDir,
+      LogConfig(logProps),
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      time = time)
+
+    val pid = 1L
+    val epoch: Short = 0
+
+    var seq = 0
+    // Pad the beginning of the log.
+    for (i <- 0 to 5) {
+      val record = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)),
+        pid = pid, epoch = epoch, sequence = seq)
+      log.append(record, assignOffsets = true)
+      seq = seq + 1
+    }
+    // Append an entry with multiple log records.
+    var record = TestUtils.records(List(
+      new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
+      new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
+      new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)
+    ), pid = pid, epoch = epoch, sequence = seq)
+    val multiEntryAppendInfo = log.append(record, assignOffsets = true)
+    assertEquals("should have appended 3 entries", multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset + 1, 3)
+    seq = seq + 3
+
+    // Append a Duplicate of the tail, when the entry at the tail has multiple records.
+    val dupMultiEntryAppendInfo = log.append(record, assignOffsets = true)
+    assertEquals("Somehow appended a duplicate entry with multiple log records to the tail",
+      multiEntryAppendInfo.firstOffset, dupMultiEntryAppendInfo.firstOffset)
+    assertEquals("Somehow appended a duplicate entry with multiple log records to the tail",
+      multiEntryAppendInfo.lastOffset, dupMultiEntryAppendInfo.lastOffset)
+
+    // Append a partial duplicate of the tail. This is not allowed.
+    try {
+      record = TestUtils.records(
+        List(
+          new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
+          new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)),
+        pid = pid, epoch = epoch, sequence = seq - 2)
+      log.append(record, assignOffsets = true)
+      fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a record " +
+        "in the middle of the log.")
+    } catch {
+      case e: OutOfOrderSequenceException => // Good!
+    }
+
+    // Append a Duplicate of an entry in the middle of the log. This is not allowed.
+     try {
+      record = TestUtils.records(
+        List(new SimpleRecord(time.milliseconds, s"key-1".getBytes, s"value-1".getBytes)),
+        pid = pid, epoch = epoch, sequence = 1)
+      log.append(record, assignOffsets = true)
+      fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a record " +
+        "in the middle of the log.")
+    } catch {
+      case e: OutOfOrderSequenceException => // Good!
+    }
+
+    // Append a duplicate entry with a single record at the tail of the log. This should return the appendInfo of the original entry.
+    record = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)),
+      pid = pid, epoch = epoch, sequence = seq)
+    val origAppendInfo = log.append(record, assignOffsets = true)
+    val newAppendInfo = log.append(record, assignOffsets = true)
+    assertEquals("Inserted a duplicate record into the log", origAppendInfo.firstOffset, newAppendInfo.firstOffset)
+    assertEquals("Inserted a duplicate record into the log", origAppendInfo.lastOffset, newAppendInfo.lastOffset)
+  }
+
+  @Test
+  def testMulitplePidsPerMemoryRecord() : Unit = {
+    val logProps = new Properties()
+
+    // create a log
+    val log = new Log(logDir,
+      LogConfig(logProps),
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      time = time)
+
+    val epoch: Short = 0
+
+    val buffer = ByteBuffer.allocate(512)
+
+    var builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 0L, time.milliseconds(), 1L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    // Append a record with other pids.
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 1L, time.milliseconds(), 2L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    // Append a record with other pids.
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 2L, time.milliseconds(), 3L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    // Append a record with other pids.
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 3L, time.milliseconds(), 4L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    buffer.flip()
+    val memoryRecords = MemoryRecords.readableRecords(buffer)
+
+    log.append(memoryRecords, assignOffsets = false)
+    log.flush()
+
+    val fetchedData = log.read(0, Int.MaxValue)
+
+    val origIterator = memoryRecords.batches.iterator()
+    for (batch <- fetchedData.records.batches.asScala) {
+      assertTrue(origIterator.hasNext)
+      val origEntry = origIterator.next()
+      assertEquals(origEntry.producerId, batch.producerId)
+      assertEquals(origEntry.baseOffset, batch.baseOffset)
+      assertEquals(origEntry.baseSequence, batch.baseSequence)
+    }
+  }
+
+  @Test(expected = classOf[DuplicateSequenceNumberException])
+  def testMultiplePidsWithDuplicates() : Unit = {
+    val logProps = new Properties()
+
+    // create a log
+    val log = new Log(logDir,
+      LogConfig(logProps),
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      time = time)
+
+    val epoch: Short = 0
+
+    val buffer = ByteBuffer.allocate(512)
+
+    var builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 0L, time.milliseconds(), 1L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    // Append a record with other pids.
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 1L, time.milliseconds(), 2L, epoch, 0)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    // Append a record with other pids.
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 2L, time.milliseconds(), 1L, epoch, 1)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 3L, time.milliseconds(), 2L, epoch, 1)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 4L, time.milliseconds(), 1L, epoch, 1)
+    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+    builder.close()
+
+    buffer.flip()
+
+    log.append(MemoryRecords.readableRecords(buffer), assignOffsets = false)
+    // Should throw a duplicate seqeuence exception here.
+    fail("should have thrown a DuplicateSequenceNumberException.")
+  }
+
+  @Test(expected = classOf[ProducerFencedException])
+  def testOldProducerEpoch(): Unit = {
+    val logProps = new Properties()
+
+    // create a log
+    val log = new Log(logDir,
+      LogConfig(logProps),
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      time = time)
+
+    val pid = 1L
+    val newEpoch: Short = 1
+    val oldEpoch: Short = 0
+
+    val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = newEpoch, sequence = 0)
+    log.append(records, assignOffsets = true)
+
+    val nextRecords = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = oldEpoch, sequence = 0)
+    log.append(nextRecords, assignOffsets = true)
+  }
+
   /**
    * Test for jitter s for time based log roll. This test appends messages then changes the time
    * using the mock clock to force the log to roll and checks the number of segments.
@@ -167,7 +380,7 @@ class LogTest extends JUnitSuite {
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
     // create a log
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
@@ -183,7 +396,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds))
   }
 
@@ -196,7 +409,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
 
     for(value <- values)
@@ -220,7 +433,7 @@ class LogTest extends JUnitSuite {
   def testAppendAndReadWithNonSequentialOffsets() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -245,7 +458,7 @@ class LogTest extends JUnitSuite {
   def testReadAtLogGap() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     // keep appending until we have two segments with only a single message in the second segment
     while(log.numberOfSegments == 1)
@@ -262,7 +475,7 @@ class LogTest extends JUnitSuite {
   def testReadWithMinMessage() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -290,7 +503,7 @@ class LogTest extends JUnitSuite {
   def testReadWithTooSmallMaxLength() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -326,7 +539,7 @@ class LogTest extends JUnitSuite {
 
     // set up replica log starting with offset 1024 and with one message (at offset 1024)
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     log.append(TestUtils.singletonRecords(value = "42".getBytes))
 
     assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).records.sizeInBytes)
@@ -357,7 +570,7 @@ class LogTest extends JUnitSuite {
     /* create a multipart log with 100 messages */
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     val numMessages = 100
     val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes,
                                                                                 timestamp = time.milliseconds))
@@ -395,7 +608,7 @@ class LogTest extends JUnitSuite {
     /* this log should roll after every messageset */
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 110: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
     log.append(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)))
@@ -421,7 +634,7 @@ class LogTest extends JUnitSuite {
       val logProps = new Properties()
       logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
       logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
-      val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+      val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
       for(i <- 0 until messagesToAppend)
         log.append(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = time.milliseconds - 10))
 
@@ -457,7 +670,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer)
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     try {
       log.append(messageSet)
@@ -484,7 +697,7 @@ class LogTest extends JUnitSuite {
     val logProps = new Properties()
     logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
 
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     try {
       log.append(messageSetWithUnkeyedMessage)
@@ -526,7 +739,7 @@ class LogTest extends JUnitSuite {
     val maxMessageSize = second.sizeInBytes - 1
     val logProps = new Properties()
     logProps.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     // should be able to append the small message
     log.append(first)
@@ -552,7 +765,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, indexInterval: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer)
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize),
         timestamp = time.milliseconds + i * 10))
@@ -578,12 +791,12 @@ class LogTest extends JUnitSuite {
       assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries)
     }
 
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = lastOffset, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = lastOffset, scheduler = time.scheduler, time = time)
     verifyRecoveredLog(log)
     log.close()
 
     // test recovery case
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     verifyRecoveredLog(log)
     log.close()
   }
@@ -599,7 +812,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
+    val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     val messages = (0 until numMessages).map { i =>
       MemoryRecords.withRecords(100 + i, CompressionType.NONE, new SimpleRecord(time.milliseconds + i, i.toString.getBytes()))
@@ -623,7 +836,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val indexFiles = log.logSegments.map(_.index.file)
@@ -635,7 +848,7 @@ class LogTest extends JUnitSuite {
     timeIndexFiles.foreach(_.delete())
 
     // reopen the log
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
     assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
@@ -662,7 +875,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.MessageFormatVersionProp, "0.9.0")
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
@@ -672,7 +885,7 @@ class LogTest extends JUnitSuite {
     timeIndexFiles.foreach(_.delete())
 
     // The rebuilt time index should be empty
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, scheduler = time.scheduler, time = time)
     val segArray = log.logSegments.toArray
     for (i <- 0 until segArray.size - 1) {
       assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries)
@@ -693,7 +906,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val indexFiles = log.logSegments.map(_.index.file)
@@ -715,7 +928,7 @@ class LogTest extends JUnitSuite {
     }
 
     // reopen the log
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, scheduler = time.scheduler, time = time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     for(i <- 0 until numMessages) {
       assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset)
@@ -842,8 +1055,8 @@ class LogTest extends JUnitSuite {
                       LogConfig(logProps),
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
-                      time.scheduler,
-                      time)
+                      scheduler = time.scheduler,
+                      time = time)
 
     assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
     assertTrue("The first time index file should have been replaced with a larger file", bogusTimeIndex1.length > 0)
@@ -874,8 +1087,8 @@ class LogTest extends JUnitSuite {
                       config,
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
-                      time.scheduler,
-                      time)
+                      scheduler = time.scheduler,
+                      time = time)
 
     // add enough messages to roll over several segments then close and re-open and attempt to truncate
     for (_ <- 0 until 100)
@@ -885,8 +1098,8 @@ class LogTest extends JUnitSuite {
                   config,
                   logStartOffset = 0L,
                   recoveryPoint = 0L,
-                  time.scheduler,
-                  time)
+                  scheduler = time.scheduler,
+                  time = time)
     log.truncateTo(3)
     assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
     assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
@@ -911,8 +1124,8 @@ class LogTest extends JUnitSuite {
                       config,
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
-                      time.scheduler,
-                      time)
+                      scheduler = time.scheduler,
+                      time = time)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -952,8 +1165,8 @@ class LogTest extends JUnitSuite {
                       config,
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
-                      time.scheduler,
-                      time)
+                      scheduler = time.scheduler,
+                      time = time)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -967,8 +1180,8 @@ class LogTest extends JUnitSuite {
                   config,
                   logStartOffset = 0L,
                   recoveryPoint = 0L,
-                  time.scheduler,
-                  time)
+                  scheduler = time.scheduler,
+                  time = time)
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
   }
 
@@ -978,8 +1191,8 @@ class LogTest extends JUnitSuite {
                       LogConfig(),
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
-                      time.scheduler,
-                      time)
+                      scheduler = time.scheduler,
+                      time = time)
     log.append(TestUtils.singletonRecords(value = null))
     val head = log.read(0, 4096, None).records.records.iterator.next()
     assertEquals(0, head.offset)
@@ -992,8 +1205,8 @@ class LogTest extends JUnitSuite {
       LogConfig(),
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
     val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray
     records.foreach(record => log.append(MemoryRecords.withRecords(CompressionType.NONE, record)))
     val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1.toString.getBytes))
@@ -1006,8 +1219,8 @@ class LogTest extends JUnitSuite {
       LogConfig(),
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
     log.append(MemoryRecords.withRecords(CompressionType.NONE,
       new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)))
   }
@@ -1029,8 +1242,8 @@ class LogTest extends JUnitSuite {
                         config,
                         logStartOffset = 0L,
                         recoveryPoint = 0L,
-                        time.scheduler,
-                        time)
+                        scheduler = time.scheduler,
+                        time = time)
       val numMessages = 50 + TestUtils.random.nextInt(50)
       for (_ <- 0 until numMessages)
         log.append(set)
@@ -1072,8 +1285,8 @@ class LogTest extends JUnitSuite {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
     val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, new SimpleRecord("v1".getBytes(), "k1".getBytes()))
     val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.NONE, new SimpleRecord("v3".getBytes(), "k3".getBytes()))
     val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 3, CompressionType.NONE, new SimpleRecord("v4".getBytes(), "k4".getBytes()))
@@ -1121,8 +1334,8 @@ class LogTest extends JUnitSuite {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
     for (_ <- 0 until 100)
       log.append(set)
     log.close()
@@ -1221,8 +1434,8 @@ class LogTest extends JUnitSuite {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -1366,8 +1579,8 @@ class LogTest extends JUnitSuite {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
     log
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala b/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala
new file mode 100644
index 0000000..d27f0ca
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala
@@ -0,0 +1,224 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.log
+
+import java.io.File
+import java.util.Properties
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{DuplicateSequenceNumberException, OutOfOrderSequenceException, ProducerFencedException}
+import org.apache.kafka.common.utils.MockTime
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import org.scalatest.junit.JUnitSuite
+
+class ProducerIdMappingTest extends JUnitSuite {
+  var idMappingDir: File = null
+  var config: LogConfig = null
+  var idMapping: ProducerIdMapping = null
+  val partition = new TopicPartition("test", 0)
+  val pid = 1L
+  val maxPidExpirationMs = 60 * 1000
+  val time = new MockTime
+
+  @Before
+  def setUp(): Unit = {
+    // Create configuration including number of snapshots to hold
+    val props = new Properties()
+    config = LogConfig(props)
+
+    // Create temporary directory
+    idMappingDir = TestUtils.tempDir()
+
+    // Instantiate IdMapping
+    idMapping = new ProducerIdMapping(config, partition, idMappingDir, maxPidExpirationMs)
+  }
+
+  @After
+  def tearDown(): Unit = {
+    idMappingDir.listFiles().foreach(f => f.delete())
+    idMappingDir.deleteOnExit()
+  }
+
+  @Test
+  def testBasicIdMapping(): Unit = {
+    val epoch = 0.toShort
+
+    // First entry for id 0 added
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, 0L)
+
+    // Second entry for id 0 added
+    checkAndUpdate(idMapping, pid, 1, epoch, 0L, 1L)
+
+    // Duplicate sequence number (matches previous sequence number)
+    assertThrows[DuplicateSequenceNumberException] {
+      checkAndUpdate(idMapping, pid, 1, epoch, 0L, 1L)
+    }
+
+    // Invalid sequence number (greater than next expected sequence number)
+    assertThrows[OutOfOrderSequenceException] {
+      checkAndUpdate(idMapping, pid, 5, epoch, 0L, 2L)
+    }
+
+    // Change epoch
+    checkAndUpdate(idMapping, pid, 0, (epoch + 1).toShort, 0L, 3L)
+
+    // Incorrect epoch
+    assertThrows[ProducerFencedException] {
+      checkAndUpdate(idMapping, pid, 0, epoch, 0L, 4L)
+    }
+  }
+
+  @Test
+  def testTakeSnapshot(): Unit = {
+    val epoch = 0.toShort
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, 0L)
+    checkAndUpdate(idMapping, pid, 1, epoch, 1L, 1L)
+
+    // Take snapshot
+    idMapping.maybeTakeSnapshot()
+
+    // Check that file exists and it is not empty
+    assertEquals("Directory doesn't contain a single file as expected", 1, idMappingDir.list().length)
+    assertTrue("Snapshot file is empty", idMappingDir.list().head.length > 0)
+  }
+
+  @Test
+  def testRecoverFromSnapshot(): Unit = {
+    val epoch = 0.toShort
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, time.milliseconds)
+    checkAndUpdate(idMapping, pid, 1, epoch, 1L, time.milliseconds)
+    idMapping.maybeTakeSnapshot()
+    val recoveredMapping = new ProducerIdMapping(config, partition, idMappingDir, maxPidExpirationMs)
+    recoveredMapping.truncateAndReload(3L, time.milliseconds)
+
+    // entry added after recovery
+    checkAndUpdate(recoveredMapping, pid, 2, epoch, 2L, time.milliseconds)
+  }
+
+  @Test(expected = classOf[OutOfOrderSequenceException])
+  def testRemoveExpiredPidsOnReload(): Unit = {
+    val epoch = 0.toShort
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, 0)
+    checkAndUpdate(idMapping, pid, 1, epoch, 1L, 1)
+
+    idMapping.maybeTakeSnapshot()
+    val recoveredMapping = new ProducerIdMapping(config, partition, idMappingDir, maxPidExpirationMs)
+    recoveredMapping.truncateAndReload(1L, 70000)
+
+    // entry added after recovery. The pid should be expired now, and would not exist in the pid mapping. Hence
+    // we should get an out of order sequence exception.
+    checkAndUpdate(recoveredMapping, pid, 2, epoch, 2L, 70001)
+  }
+
+
+  @Test
+  def testRemoveOldSnapshot(): Unit = {
+    val epoch = 0.toShort
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, 1L)
+    checkAndUpdate(idMapping, pid, 1, epoch, 1L, 2L)
+
+    idMapping.maybeTakeSnapshot()
+
+    checkAndUpdate(idMapping, pid, 2, epoch, 2L, 3L)
+
+    idMapping.maybeTakeSnapshot()
+
+    assertEquals(s"number of snapshot files is incorrect: ${idMappingDir.listFiles().length}",
+               1, idMappingDir.listFiles().length)
+  }
+
+  @Test
+  def testSkipSnapshotIfOffsetUnchanged(): Unit = {
+    val epoch = 0.toShort
+    checkAndUpdate(idMapping, pid, 0, epoch, 0L, 0L)
+
+    idMapping.maybeTakeSnapshot()
+
+    // nothing changed so there should be no new snapshot
+    idMapping.maybeTakeSnapshot()
+
+    assertEquals(s"number of snapshot files is incorrect: ${idMappingDir.listFiles().length}",
+      1, idMappingDir.listFiles().length)
+  }
+
+  @Test
+  def testStartOffset(): Unit = {
+    val epoch = 0.toShort
+    val pid2 = 2L
+    checkAndUpdate(idMapping, pid2, 0, epoch, 0L, 1L)
+    checkAndUpdate(idMapping, pid, 0, epoch, 1L, 2L)
+    checkAndUpdate(idMapping, pid, 1, epoch, 2L, 3L)
+    checkAndUpdate(idMapping, pid, 2, epoch, 3L, 4L)
+    idMapping.maybeTakeSnapshot()
+
+    intercept[OutOfOrderSequenceException] {
+      val recoveredMapping = new ProducerIdMapping(config, partition, idMappingDir, maxPidExpirationMs)
+      recoveredMapping.truncateAndReload(1L, time.milliseconds)
+      checkAndUpdate(recoveredMapping, pid2, 1, epoch, 4L, 5L)
+    }
+  }
+
+  @Test(expected = classOf[OutOfOrderSequenceException])
+  def testPidExpirationTimeout() {
+    val epoch = 5.toShort
+    val sequence = 37
+    checkAndUpdate(idMapping, pid, sequence, epoch, 1L)
+    time.sleep(maxPidExpirationMs + 1)
+    idMapping.checkForExpiredPids(time.milliseconds)
+    checkAndUpdate(idMapping, pid, sequence + 1, epoch, 1L)
+  }
+
+  @Test
+  def testLoadPid() {
+    val epoch = 5.toShort
+    val sequence = 37
+    val createTimeMs = time.milliseconds
+    idMapping.load(pid, ProducerIdEntry(epoch, sequence, 0L, 1, createTimeMs), time.milliseconds)
+    checkAndUpdate(idMapping, pid, sequence + 1, epoch, 2L)
+  }
+
+  @Test(expected = classOf[OutOfOrderSequenceException])
+  def testLoadIgnoresExpiredPids() {
+    val epoch = 5.toShort
+    val sequence = 37
+
+    val createTimeMs = time.milliseconds
+    time.sleep(maxPidExpirationMs + 1)
+    val loadTimeMs = time.milliseconds
+    idMapping.load(pid, ProducerIdEntry(epoch, sequence, 0L, 1, createTimeMs), loadTimeMs)
+
+    // entry wasn't loaded, so this should fail
+    checkAndUpdate(idMapping, pid, sequence + 1, epoch, 2L)
+  }
+
+  private def checkAndUpdate(mapping: ProducerIdMapping,
+                             pid: Long,
+                             seq: Int,
+                             epoch: Short,
+                             lastOffset: Long,
+                             timestamp: Long = time.milliseconds()): Unit = {
+    val numRecords = 1
+    val incomingPidEntry = ProducerIdEntry(epoch, seq, lastOffset, numRecords, timestamp)
+    val producerAppendInfo = new ProducerAppendInfo(pid, mapping.lastEntry(pid).getOrElse(ProducerIdEntry.Empty))
+    producerAppendInfo.append(incomingPidEntry)
+    mapping.update(producerAppendInfo)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index b6e3607..f868032 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -201,6 +201,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     val start = System.currentTimeMillis()
 
     //Start the new broker (and hence start replicating)
+    debug("Starting new broker")
     brokers = brokers :+ createServer(fromProps(createBrokerConfig(101, zkConnect)))
     waitForOffsetsToMatch(msgCount, 0, 101)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 9ae7195..8766855 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -937,9 +937,10 @@ object TestUtils extends Logging {
                    flushRecoveryOffsetCheckpointMs = 10000L,
                    flushStartOffsetCheckpointMs = 10000L,
                    retentionCheckMs = 1000L,
+                   maxPidExpirationMs = 60 * 60 * 1000,
                    scheduler = time.scheduler,
                    time = time,
-                   brokerState = new BrokerState())
+                   brokerState = BrokerState())
   }
 
   @deprecated("This method has been deprecated and it will be removed in a future release.", "0.10.0.0")

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
index 918c4b5..db62020 100755
--- a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
@@ -40,6 +40,21 @@ class ZkUtilsTest extends ZooKeeperTestHarness {
     assertTrue("Deletion should be successful", zkUtils.conditionalDeletePath(path, 0))
   }
 
+  // Verify behaviour of ZkUtils.createSequentialPersistentPath since PIDManager relies on it
+  @Test
+  def testPersistentSequentialPath() {
+    // Given an existing path
+    zkUtils.createPersistentPath(path)
+
+    var result = zkUtils.createSequentialPersistentPath(path + "/sequence_")
+
+    assertEquals("/path/sequence_0000000000", result)
+
+    result = zkUtils.createSequentialPersistentPath(path + "/sequence_")
+
+    assertEquals("/path/sequence_0000000001", result)
+  }
+
   @Test
   def testAbortedConditionalDeletePath() {
     // Given an existing path that gets updated

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 859e3c4..3cf3abd 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -15,25 +15,22 @@
 
 import json
 import os
-import signal
 import time
 
 from ducktape.services.background_thread import BackgroundThreadService
 from ducktape.cluster.remoteaccount import RemoteCommandError
-from ducktape.utils.util import wait_until
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.verifiable_client import VerifiableClientMixin
 from kafkatest.utils import is_int, is_int_with_prefix
 from kafkatest.version import DEV_BRANCH
-from kafkatest.utils.remote_account import line_count
 
 class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService):
     """This service wraps org.apache.kafka.tools.VerifiableProducer for use in
-    system testing. 
+    system testing.
 
     NOTE: this class should be treated as a PUBLIC API. Downstream users use
-    this service both directly and through class extension, so care must be 
+    this service both directly and through class extension, so care must be
     taken to ensure compatibility.
     """
 
@@ -59,7 +56,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
 
     def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000,
                  message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None,
-                 stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO"):
+                 stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO", enable_idempotence=False):
         """
         :param max_messages is a number of messages to be produced per producer
         :param message_validator checks for an expected format of messages produced. There are
@@ -92,6 +89,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
         self.acks = acks
         self.stop_timeout_sec = stop_timeout_sec
         self.request_timeout_sec = request_timeout_sec
+        self.enable_idempotence = enable_idempotence
 
     def java_class_name(self):
         return "VerifiableProducer"
@@ -124,6 +122,12 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
             producer_prop_file += "\nacks=%s\n" % self.acks
 
         producer_prop_file += "\nrequest.timeout.ms=%d\n" % (self.request_timeout_sec * 1000)
+        if self.enable_idempotence:
+            self.logger.info("Setting up an idempotent producer")
+            producer_prop_file += "\nmax.in.flight.requests.per.connection=1\n"
+            producer_prop_file += "\nretries=50\n"
+            producer_prop_file += "\nenable.idempotence=true\n"
+
         self.logger.info("verifiable_producer.properties:")
         self.logger.info(producer_prop_file)
         node.account.create_file(VerifiableProducer.CONFIG_FILE, producer_prop_file)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/tests/kafkatest/tests/core/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py
index 3e17d56..5d96d7b 100644
--- a/tests/kafkatest/tests/core/replication_test.py
+++ b/tests/kafkatest/tests/core/replication_test.py
@@ -104,11 +104,12 @@ class ReplicationTest(ProduceConsumeValidateTest):
 
         self.topic = "test_topic"
         self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
-                                                                    "partitions": 3,
-                                                                    "replication-factor": 3,
-                                                                    'configs': {"min.insync.replicas": 2}}
-                                                                })
+        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk,
+                                  topics={self.topic: {
+                                      "partitions": 3,
+                                      "replication-factor": 3,
+                                      'configs': {"min.insync.replicas": 2}}
+                                  })
         self.producer_throughput = 1000
         self.num_producers = 1
         self.num_consumers = 1
@@ -123,6 +124,10 @@ class ReplicationTest(ProduceConsumeValidateTest):
     @cluster(num_nodes=7)
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
             broker_type=["leader"],
+            security_protocol=["PLAINTEXT"],
+            enable_idempotence=[True])
+    @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
+            broker_type=["leader"],
             security_protocol=["PLAINTEXT", "SASL_SSL"])
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
             broker_type=["controller"],
@@ -133,7 +138,10 @@ class ReplicationTest(ProduceConsumeValidateTest):
     @parametrize(failure_mode="hard_bounce",
             broker_type="leader",
             security_protocol="SASL_SSL", client_sasl_mechanism="SCRAM-SHA-256", interbroker_sasl_mechanism="SCRAM-SHA-512")
-    def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type, client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI"):
+    def test_replication_with_broker_failure(self, failure_mode, security_protocol,
+                                             broker_type, client_sasl_mechanism="GSSAPI",
+                                             interbroker_sasl_mechanism="GSSAPI",
+                                             enable_idempotence=False):
         """Replication tests.
         These tests verify that replication provides simple durability guarantees by checking that data acked by
         brokers is still available for consumption in the face of various failure scenarios.
@@ -152,8 +160,8 @@ class ReplicationTest(ProduceConsumeValidateTest):
         self.kafka.client_sasl_mechanism = client_sasl_mechanism
         self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism
         new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
+        self.enable_idempotence = enable_idempotence
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput, enable_idempotence=enable_idempotence)
         self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
         self.kafka.start()
-        
         self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self, broker_type))

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/tests/kafkatest/tests/produce_consume_validate.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py
index cad9150..079305c 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -42,6 +42,7 @@ class ProduceConsumeValidateTest(Test):
         # producer begins producing messages, in which case we will miss the
         # initial set of messages and get spurious test failures.
         self.consumer_init_timeout_sec = 0
+        self.enable_idempotence = False
 
     def setup_producer_and_consumer(self):
         raise NotImplementedError("Subclasses should implement this")
@@ -67,7 +68,7 @@ class ProduceConsumeValidateTest(Test):
             remaining_time = self.consumer_init_timeout_sec - (end - start)
             if remaining_time < 0 :
                 remaining_time = 0
-            if self.consumer.new_consumer is True:
+            if self.consumer.new_consumer:
                 wait_until(lambda: self.consumer.has_partitions_assigned(self.consumer.nodes[0]) is True,
                            timeout_sec=remaining_time,
                            err_msg="Consumer process took more than %d s to have partitions assigned" %\
@@ -167,9 +168,17 @@ class ProduceConsumeValidateTest(Test):
             msg = self.annotate_data_lost(data_lost, msg, len(to_validate))
 
 
+        if self.enable_idempotence:
+            self.logger.info("Ran a test with idempotence enabled. We expect no duplicates")
+        else:
+            self.logger.info("Ran a test with idempotence disabled.")
+
         # Are there duplicates?
         if len(set(consumed)) != len(consumed):
-            msg += "(There are also %s duplicate messages in the log - but that is an acceptable outcome)\n" % abs(len(set(consumed)) - len(consumed))
+            num_duplicates = abs(len(set(consumed)) - len(consumed))
+            msg += "(There are also %s duplicate messages in the log - but that is an acceptable outcome)\n" % num_duplicates
+            if self.enable_idempotence:
+                assert False, "Detected %s duplicates even though idempotence was enabled." % num_duplicates
 
         # Collect all logs if validation fails
         if not success:


Mime
View raw message