kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/6] kafka git commit: KAFKA-5121; Implement transaction index for KIP-98
Date Sat, 06 May 2017 18:51:10 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 29994dd10 -> e71dce89c


http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
new file mode 100644
index 0000000..e8c918d
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -0,0 +1,562 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.log
+
+import java.io.File
+
+import kafka.server.LogOffsetMetadata
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors._
+import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, RecordBatch}
+import org.apache.kafka.common.utils.{MockTime, Utils}
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import org.scalatest.junit.JUnitSuite
+
+class ProducerStateManagerTest extends JUnitSuite {
+  var idMappingDir: File = null
+  var idMapping: ProducerStateManager = null
+  val partition = new TopicPartition("test", 0)
+  val pid = 1L
+  val maxPidExpirationMs = 60 * 1000
+  val time = new MockTime
+
+  @Before
+  def setUp(): Unit = {
+    idMappingDir = TestUtils.tempDir()
+    idMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs)
+  }
+
+  @After
+  def tearDown(): Unit = {
+    Utils.delete(idMappingDir)
+  }
+
+  @Test
+  def testBasicIdMapping(): Unit = {
+    val epoch = 0.toShort
+
+    // First entry for id 0 added
+    append(idMapping, pid, 0, epoch, 0L, 0L)
+
+    // Second entry for id 0 added
+    append(idMapping, pid, 1, epoch, 0L, 1L)
+
+    // Duplicate sequence number (matches previous sequence number)
+    assertThrows[DuplicateSequenceNumberException] {
+      append(idMapping, pid, 1, epoch, 0L, 1L)
+    }
+
+    // Invalid sequence number (greater than next expected sequence number)
+    assertThrows[OutOfOrderSequenceException] {
+      append(idMapping, pid, 5, epoch, 0L, 2L)
+    }
+
+    // Change epoch
+    append(idMapping, pid, 0, (epoch + 1).toShort, 0L, 3L)
+
+    // Incorrect epoch
+    assertThrows[ProducerFencedException] {
+      append(idMapping, pid, 0, epoch, 0L, 4L)
+    }
+  }
+
+  @Test
+  def testNoValidationOnFirstEntryWhenLoadingLog(): Unit = {
+    val epoch = 5.toShort
+    val sequence = 16
+    val offset = 735L
+    append(idMapping, pid, sequence, epoch, offset, isLoadingFromLog = true)
+
+    val maybeLastEntry = idMapping.lastEntry(pid)
+    assertTrue(maybeLastEntry.isDefined)
+
+    val lastEntry = maybeLastEntry.get
+    assertEquals(epoch, lastEntry.producerEpoch)
+    assertEquals(sequence, lastEntry.firstSeq)
+    assertEquals(sequence, lastEntry.lastSeq)
+    assertEquals(offset, lastEntry.lastOffset)
+    assertEquals(offset, lastEntry.firstOffset)
+  }
+
+  @Test
+  def testControlRecordBumpsEpoch(): Unit = {
+    val epoch = 0.toShort
+    append(idMapping, pid, 0, epoch, 0L)
+
+    val bumpedEpoch = 1.toShort
+    val (completedTxn, lastStableOffset) = appendEndTxnMarker(idMapping, pid, bumpedEpoch, ControlRecordType.ABORT, 1L)
+    assertEquals(1L, completedTxn.firstOffset)
+    assertEquals(1L, completedTxn.lastOffset)
+    assertEquals(2L, lastStableOffset)
+    assertTrue(completedTxn.isAborted)
+    assertEquals(pid, completedTxn.producerId)
+
+    val maybeLastEntry = idMapping.lastEntry(pid)
+    assertTrue(maybeLastEntry.isDefined)
+
+    val lastEntry = maybeLastEntry.get
+    assertEquals(bumpedEpoch, lastEntry.producerEpoch)
+    assertEquals(None, lastEntry.currentTxnFirstOffset)
+    assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.firstSeq)
+    assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.lastSeq)
+
+    // should be able to append with the new epoch if we start at sequence 0
+    append(idMapping, pid, 0, bumpedEpoch, 2L)
+    assertEquals(Some(0), idMapping.lastEntry(pid).map(_.firstSeq))
+  }
+
+  @Test
+  def testTxnFirstOffsetMetadataCached(): Unit = {
+    val producerEpoch = 0.toShort
+    val offset = 992342L
+    val seq = 0
+    val producerAppendInfo = new ProducerAppendInfo(pid, None, false)
+    producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true)
+
+    val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L,
+      relativePositionInSegment = 234224)
+    producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
+    idMapping.update(producerAppendInfo)
+
+    assertEquals(Some(logOffsetMetadata), idMapping.firstUnstableOffset)
+  }
+
+  @Test
+  def testNonMatchingTxnFirstOffsetMetadataNotCached(): Unit = {
+    val producerEpoch = 0.toShort
+    val offset = 992342L
+    val seq = 0
+    val producerAppendInfo = new ProducerAppendInfo(pid, None, false)
+    producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true)
+
+    // use some other offset to simulate a follower append where the log offset metadata won't typically
+    // match any of the transaction first offsets
+    val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset - 23429, segmentBaseOffset = 990000L,
+      relativePositionInSegment = 234224)
+    producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
+    idMapping.update(producerAppendInfo)
+
+    assertEquals(Some(LogOffsetMetadata(offset)), idMapping.firstUnstableOffset)
+  }
+
+  @Test
+  def updateProducerTransactionState(): Unit = {
+    val producerEpoch = 0.toShort
+    val coordinatorEpoch = 15
+    val offset = 9L
+    append(idMapping, pid, 0, producerEpoch, offset)
+
+    val appendInfo = new ProducerAppendInfo(pid, idMapping.lastEntry(pid), loadingFromLog = false)
+    appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 20L, isTransactional = true)
+    var lastEntry = appendInfo.lastEntry
+    assertEquals(producerEpoch, lastEntry.producerEpoch)
+    assertEquals(1, lastEntry.firstSeq)
+    assertEquals(5, lastEntry.lastSeq)
+    assertEquals(16L, lastEntry.firstOffset)
+    assertEquals(20L, lastEntry.lastOffset)
+    assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
+    assertEquals(List(new TxnMetadata(pid, 16L)), appendInfo.startedTransactions)
+
+    appendInfo.append(producerEpoch, 6, 10, time.milliseconds(), 30L, isTransactional = true)
+    lastEntry = appendInfo.lastEntry
+    assertEquals(producerEpoch, lastEntry.producerEpoch)
+    assertEquals(6, lastEntry.firstSeq)
+    assertEquals(10, lastEntry.lastSeq)
+    assertEquals(26L, lastEntry.firstOffset)
+    assertEquals(30L, lastEntry.lastOffset)
+    assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
+    assertEquals(List(new TxnMetadata(pid, 16L)), appendInfo.startedTransactions)
+
+    val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch)
+    val completedTxn = appendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, 40L, time.milliseconds())
+    assertEquals(pid, completedTxn.producerId)
+    assertEquals(16L, completedTxn.firstOffset)
+    assertEquals(40L, completedTxn.lastOffset)
+    assertFalse(completedTxn.isAborted)
+
+    lastEntry = appendInfo.lastEntry
+    assertEquals(producerEpoch, lastEntry.producerEpoch)
+    assertEquals(10, lastEntry.firstSeq)
+    assertEquals(10, lastEntry.lastSeq)
+    assertEquals(40L, lastEntry.firstOffset)
+    assertEquals(40L, lastEntry.lastOffset)
+    assertEquals(coordinatorEpoch, lastEntry.coordinatorEpoch)
+    assertEquals(None, lastEntry.currentTxnFirstOffset)
+    assertEquals(List(new TxnMetadata(pid, 16L)), appendInfo.startedTransactions)
+  }
+
+  @Test(expected = classOf[OutOfOrderSequenceException])
+  def testOutOfSequenceAfterControlRecordEpochBump(): Unit = {
+    val epoch = 0.toShort
+    append(idMapping, pid, 0, epoch, 0L)
+    append(idMapping, pid, 1, epoch, 1L)
+
+    val bumpedEpoch = 1.toShort
+    appendEndTxnMarker(idMapping, pid, bumpedEpoch, ControlRecordType.ABORT, 1L)
+
+    // next append is invalid since we expect the sequence to be reset
+    append(idMapping, pid, 2, bumpedEpoch, 2L)
+  }
+
+  @Test(expected = classOf[InvalidTxnStateException])
+  def testNonTransactionalAppendWithOngoingTransaction(): Unit = {
+    val epoch = 0.toShort
+    append(idMapping, pid, 0, epoch, 0L, isTransactional = true)
+    append(idMapping, pid, 1, epoch, 1L, isTransactional = false)
+  }
+
+  @Test
+  def testTakeSnapshot(): Unit = {
+    val epoch = 0.toShort
+    append(idMapping, pid, 0, epoch, 0L, 0L)
+    append(idMapping, pid, 1, epoch, 1L, 1L)
+
+    // Take snapshot
+    idMapping.takeSnapshot()
+
+    // Check that file exists and it is not empty
+    assertEquals("Directory doesn't contain a single file as expected", 1, idMappingDir.list().length)
+    assertTrue("Snapshot file is empty", idMappingDir.list().head.length > 0)
+  }
+
+  @Test
+  def testRecoverFromSnapshot(): Unit = {
+    val epoch = 0.toShort
+    append(idMapping, pid, 0, epoch, 0L)
+    append(idMapping, pid, 1, epoch, 1L)
+
+    idMapping.takeSnapshot()
+    val recoveredMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs)
+    recoveredMapping.truncateAndReload(0L, 3L, time.milliseconds)
+
+    // entry added after recovery
+    append(recoveredMapping, pid, 2, epoch, 2L)
+  }
+
+  @Test(expected = classOf[OutOfOrderSequenceException])
+  def testRemoveExpiredPidsOnReload(): Unit = {
+    val epoch = 0.toShort
+    append(idMapping, pid, 0, epoch, 0L, 0)
+    append(idMapping, pid, 1, epoch, 1L, 1)
+
+    idMapping.takeSnapshot()
+    val recoveredMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs)
+    recoveredMapping.truncateAndReload(0L, 1L, 70000)
+
+    // entry added after recovery. The pid should be expired now, and would not exist in the pid mapping. Hence
+    // we should get an out of order sequence exception.
+    append(recoveredMapping, pid, 2, epoch, 2L, 70001)
+  }
+
+  @Test
+  def testDeleteSnapshotsBefore(): Unit = {
+    val epoch = 0.toShort
+    append(idMapping, pid, 0, epoch, 0L)
+    append(idMapping, pid, 1, epoch, 1L)
+    idMapping.takeSnapshot()
+    assertEquals(1, idMappingDir.listFiles().length)
+    assertEquals(Set(2), currentSnapshotOffsets)
+
+    append(idMapping, pid, 2, epoch, 2L)
+    idMapping.takeSnapshot()
+    assertEquals(2, idMappingDir.listFiles().length)
+    assertEquals(Set(2, 3), currentSnapshotOffsets)
+
+    idMapping.deleteSnapshotsBefore(3L)
+    assertEquals(1, idMappingDir.listFiles().length)
+    assertEquals(Set(3), currentSnapshotOffsets)
+
+    idMapping.deleteSnapshotsBefore(4L)
+    assertEquals(0, idMappingDir.listFiles().length)
+    assertEquals(Set(), currentSnapshotOffsets)
+  }
+
+  @Test
+  def testTruncate(): Unit = {
+    val epoch = 0.toShort
+
+    append(idMapping, pid, 0, epoch, 0L)
+    append(idMapping, pid, 1, epoch, 1L)
+    idMapping.takeSnapshot()
+    assertEquals(1, idMappingDir.listFiles().length)
+    assertEquals(Set(2), currentSnapshotOffsets)
+
+    append(idMapping, pid, 2, epoch, 2L)
+    idMapping.takeSnapshot()
+    assertEquals(2, idMappingDir.listFiles().length)
+    assertEquals(Set(2, 3), currentSnapshotOffsets)
+
+    idMapping.truncate()
+
+    assertEquals(0, idMappingDir.listFiles().length)
+    assertEquals(Set(), currentSnapshotOffsets)
+
+    append(idMapping, pid, 0, epoch, 0L)
+    idMapping.takeSnapshot()
+    assertEquals(1, idMappingDir.listFiles().length)
+    assertEquals(Set(1), currentSnapshotOffsets)
+  }
+
+  @Test
+  def testFirstUnstableOffsetAfterTruncation(): Unit = {
+    val epoch = 0.toShort
+    val sequence = 0
+
+    append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true)
+    assertEquals(Some(99), idMapping.firstUnstableOffset.map(_.messageOffset))
+    idMapping.takeSnapshot()
+
+    appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 105)
+    idMapping.onHighWatermarkUpdated(106)
+    assertEquals(None, idMapping.firstUnstableOffset.map(_.messageOffset))
+    idMapping.takeSnapshot()
+
+    append(idMapping, pid, sequence + 1, epoch, offset = 106)
+    idMapping.truncateAndReload(0L, 106, time.milliseconds())
+    assertEquals(None, idMapping.firstUnstableOffset.map(_.messageOffset))
+
+    idMapping.truncateAndReload(0L, 100L, time.milliseconds())
+    assertEquals(Some(99), idMapping.firstUnstableOffset.map(_.messageOffset))
+  }
+
+  @Test
+  def testFirstUnstableOffsetAfterEviction(): Unit = {
+    val epoch = 0.toShort
+    val sequence = 0
+    append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true)
+    assertEquals(Some(99), idMapping.firstUnstableOffset.map(_.messageOffset))
+    append(idMapping, 2L, 0, epoch, offset = 106, isTransactional = true)
+    idMapping.evictUnretainedProducers(100)
+    assertEquals(Some(106), idMapping.firstUnstableOffset.map(_.messageOffset))
+  }
+
+  @Test
+  def testEvictUnretainedPids(): Unit = {
+    val epoch = 0.toShort
+
+    append(idMapping, pid, 0, epoch, 0L)
+    append(idMapping, pid, 1, epoch, 1L)
+    idMapping.takeSnapshot()
+
+    val anotherPid = 2L
+    append(idMapping, anotherPid, 0, epoch, 2L)
+    append(idMapping, anotherPid, 1, epoch, 3L)
+    idMapping.takeSnapshot()
+    assertEquals(Set(2, 4), currentSnapshotOffsets)
+
+    idMapping.evictUnretainedProducers(2)
+    assertEquals(Set(4), currentSnapshotOffsets)
+    assertEquals(Set(anotherPid), idMapping.activeProducers.keySet)
+    assertEquals(None, idMapping.lastEntry(pid))
+
+    val maybeEntry = idMapping.lastEntry(anotherPid)
+    assertTrue(maybeEntry.isDefined)
+    assertEquals(3L, maybeEntry.get.lastOffset)
+
+    idMapping.evictUnretainedProducers(3)
+    assertEquals(Set(anotherPid), idMapping.activeProducers.keySet)
+    assertEquals(Set(4), currentSnapshotOffsets)
+    assertEquals(4, idMapping.mapEndOffset)
+
+    idMapping.evictUnretainedProducers(5)
+    assertEquals(Set(), idMapping.activeProducers.keySet)
+    assertEquals(Set(), currentSnapshotOffsets)
+    assertEquals(5, idMapping.mapEndOffset)
+  }
+
+  @Test
+  def testSkipSnapshotIfOffsetUnchanged(): Unit = {
+    val epoch = 0.toShort
+    append(idMapping, pid, 0, epoch, 0L, 0L)
+
+    idMapping.takeSnapshot()
+    assertEquals(1, idMappingDir.listFiles().length)
+    assertEquals(Set(1), currentSnapshotOffsets)
+
+    // nothing changed so there should be no new snapshot
+    idMapping.takeSnapshot()
+    assertEquals(1, idMappingDir.listFiles().length)
+    assertEquals(Set(1), currentSnapshotOffsets)
+  }
+
+  @Test
+  def testStartOffset(): Unit = {
+    val epoch = 0.toShort
+    val pid2 = 2L
+    append(idMapping, pid2, 0, epoch, 0L, 1L)
+    append(idMapping, pid, 0, epoch, 1L, 2L)
+    append(idMapping, pid, 1, epoch, 2L, 3L)
+    append(idMapping, pid, 2, epoch, 3L, 4L)
+    idMapping.takeSnapshot()
+
+    intercept[OutOfOrderSequenceException] {
+      val recoveredMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs)
+      recoveredMapping.truncateAndReload(0L, 1L, time.milliseconds)
+      append(recoveredMapping, pid2, 1, epoch, 4L, 5L)
+    }
+  }
+
+  @Test(expected = classOf[OutOfOrderSequenceException])
+  def testPidExpirationTimeout() {
+    val epoch = 5.toShort
+    val sequence = 37
+    append(idMapping, pid, sequence, epoch, 1L)
+    time.sleep(maxPidExpirationMs + 1)
+    idMapping.removeExpiredProducers(time.milliseconds)
+    append(idMapping, pid, sequence + 1, epoch, 1L)
+  }
+
+  @Test
+  def testFirstUnstableOffset() {
+    val epoch = 5.toShort
+    val sequence = 0
+
+    assertEquals(None, idMapping.firstUndecidedOffset)
+
+    append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true)
+    assertEquals(Some(99L), idMapping.firstUndecidedOffset)
+    assertEquals(Some(99L), idMapping.firstUnstableOffset.map(_.messageOffset))
+
+    val anotherPid = 2L
+    append(idMapping, anotherPid, sequence, epoch, offset = 105, isTransactional = true)
+    assertEquals(Some(99L), idMapping.firstUndecidedOffset)
+    assertEquals(Some(99L), idMapping.firstUnstableOffset.map(_.messageOffset))
+
+    appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 109)
+    assertEquals(Some(105L), idMapping.firstUndecidedOffset)
+    assertEquals(Some(99L), idMapping.firstUnstableOffset.map(_.messageOffset))
+
+    idMapping.onHighWatermarkUpdated(100L)
+    assertEquals(Some(99L), idMapping.firstUnstableOffset.map(_.messageOffset))
+
+    idMapping.onHighWatermarkUpdated(110L)
+    assertEquals(Some(105L), idMapping.firstUnstableOffset.map(_.messageOffset))
+
+    appendEndTxnMarker(idMapping, anotherPid, epoch, ControlRecordType.ABORT, offset = 112)
+    assertEquals(None, idMapping.firstUndecidedOffset)
+    assertEquals(Some(105L), idMapping.firstUnstableOffset.map(_.messageOffset))
+
+    idMapping.onHighWatermarkUpdated(113L)
+    assertEquals(None, idMapping.firstUnstableOffset.map(_.messageOffset))
+  }
+
+  @Test
+  def testProducersWithOngoingTransactionsDontExpire() {
+    val epoch = 5.toShort
+    val sequence = 0
+
+    append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true)
+    assertEquals(Some(99L), idMapping.firstUndecidedOffset)
+
+    time.sleep(maxPidExpirationMs + 1)
+    idMapping.removeExpiredProducers(time.milliseconds)
+
+    assertTrue(idMapping.lastEntry(pid).isDefined)
+    assertEquals(Some(99L), idMapping.firstUndecidedOffset)
+
+    idMapping.removeExpiredProducers(time.milliseconds)
+    assertTrue(idMapping.lastEntry(pid).isDefined)
+  }
+
+  @Test(expected = classOf[ProducerFencedException])
+  def testOldEpochForControlRecord(): Unit = {
+    val epoch = 5.toShort
+    val sequence = 0
+
+    assertEquals(None, idMapping.firstUndecidedOffset)
+
+    append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true)
+    appendEndTxnMarker(idMapping, pid, 3.toShort, ControlRecordType.COMMIT, offset=100)
+  }
+
+  @Test
+  def testCoordinatorFencing(): Unit = {
+    val epoch = 5.toShort
+    val sequence = 0
+
+    append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true)
+    appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 1)
+
+    val lastEntry = idMapping.lastEntry(pid)
+    assertEquals(Some(1), lastEntry.map(_.coordinatorEpoch))
+
+    // writing with the current epoch is allowed
+    appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 101, coordinatorEpoch = 1)
+
+    // bumping the epoch is allowed
+    appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 102, coordinatorEpoch = 2)
+
+    // old epochs are not allowed
+    try {
+      appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 103, coordinatorEpoch = 1)
+      fail("Expected coordinator to be fenced")
+    } catch {
+      case e: TransactionCoordinatorFencedException =>
+    }
+  }
+
+  @Test(expected = classOf[TransactionCoordinatorFencedException])
+  def testCoordinatorFencedAfterReload(): Unit = {
+    val epoch = 0.toShort
+    append(idMapping, pid, 0, epoch, offset = 99, isTransactional = true)
+    appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 1)
+    idMapping.takeSnapshot()
+
+    val recoveredMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs)
+    recoveredMapping.truncateAndReload(0L, 2L, 70000)
+
+    // append from old coordinator should be rejected
+    appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 0)
+  }
+
+  private def appendEndTxnMarker(mapping: ProducerStateManager,
+                                 pid: Long,
+                                 epoch: Short,
+                                 controlType: ControlRecordType,
+                                 offset: Long,
+                                 coordinatorEpoch: Int = 0,
+                                 timestamp: Long = time.milliseconds()): (CompletedTxn, Long) = {
+    val producerAppendInfo = new ProducerAppendInfo(pid, mapping.lastEntry(pid).getOrElse(ProducerIdEntry.Empty))
+    val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch)
+    val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, epoch, offset, timestamp)
+    mapping.update(producerAppendInfo)
+    val lastStableOffset = mapping.completeTxn(completedTxn)
+    mapping.updateMapEndOffset(offset + 1)
+    (completedTxn, lastStableOffset)
+  }
+
+  private def append(mapping: ProducerStateManager,
+                     pid: Long,
+                     seq: Int,
+                     epoch: Short,
+                     offset: Long,
+                     timestamp: Long = time.milliseconds(),
+                     isTransactional: Boolean = false,
+                     isLoadingFromLog: Boolean = false): Unit = {
+    val producerAppendInfo = new ProducerAppendInfo(pid, mapping.lastEntry(pid), isLoadingFromLog)
+    producerAppendInfo.append(epoch, seq, seq, timestamp, offset, isTransactional)
+    mapping.update(producerAppendInfo)
+    mapping.updateMapEndOffset(offset + 1)
+  }
+
+  private def currentSnapshotOffsets =
+    idMappingDir.listFiles().map(file => Log.offsetFromFilename(file.getName)).toSet
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
new file mode 100644
index 0000000..4546818
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.File
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import org.scalatest.junit.JUnitSuite
+
+class TransactionIndexTest extends JUnitSuite {
+  var file: File = _
+  var index: TransactionIndex = _
+  val offset = 0L
+
+  @Before
+  def setup: Unit = {
+    file = TestUtils.tempFile()
+    index = new TransactionIndex(offset, file)
+  }
+
+  @After
+  def teardown: Unit = {
+    index.close()
+  }
+
+  @Test
+  def testPositionSetCorrectlyWhenOpened(): Unit = {
+    val abortedTxns = List(
+      new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11),
+      new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13),
+      new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
+      new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
+    abortedTxns.foreach(index.append)
+    index.close()
+
+    val reopenedIndex = new TransactionIndex(0L, file)
+    val anotherAbortedTxn = new AbortedTxn(producerId = 3L, firstOffset = 50, lastOffset = 60, lastStableOffset = 55)
+    reopenedIndex.append(anotherAbortedTxn)
+    assertEquals(abortedTxns ++ List(anotherAbortedTxn), reopenedIndex.allAbortedTxns)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testSanityCheck(): Unit = {
+    val abortedTxns = List(
+      new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11),
+      new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13),
+      new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
+      new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
+    abortedTxns.foreach(index.append)
+    index.close()
+
+    // open the index with a different starting offset to fake invalid data
+    val reopenedIndex = new TransactionIndex(100L, file)
+    reopenedIndex.sanityCheck()
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testLastOffsetMustIncrease(): Unit = {
+    index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13))
+    index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 15, lastStableOffset = 11))
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testLastOffsetCannotDecrease(): Unit = {
+    index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13))
+    index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11))
+  }
+
+  @Test
+  def testCollectAbortedTransactions(): Unit = {
+    val abortedTxns = List(
+      new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11),
+      new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13),
+      new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
+      new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
+
+    abortedTxns.foreach(index.append)
+
+    val abortedTransactions = abortedTxns.map(_.asAbortedTransaction)
+
+    var result = index.collectAbortedTxns(0L, 100L)
+    assertEquals(abortedTransactions, result.abortedTransactions)
+    assertFalse(result.isComplete)
+
+    result = index.collectAbortedTxns(0L, 32)
+    assertEquals(abortedTransactions.take(3), result.abortedTransactions)
+    assertTrue(result.isComplete)
+
+    result = index.collectAbortedTxns(0L, 35)
+    assertEquals(abortedTransactions, result.abortedTransactions)
+    assertTrue(result.isComplete)
+
+    result = index.collectAbortedTxns(10, 35)
+    assertEquals(abortedTransactions, result.abortedTransactions)
+    assertTrue(result.isComplete)
+
+    result = index.collectAbortedTxns(11, 35)
+    assertEquals(abortedTransactions.slice(1, 4), result.abortedTransactions)
+    assertTrue(result.isComplete)
+
+    result = index.collectAbortedTxns(20, 41)
+    assertEquals(abortedTransactions.slice(2, 4), result.abortedTransactions)
+    assertFalse(result.isComplete)
+  }
+
+  @Test
+  def testTruncate(): Unit = {
+    val abortedTxns = List(
+      new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2),
+      new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16),
+      new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
+      new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
+    val abortedTransactions = abortedTxns.map(_.asAbortedTransaction)
+
+    abortedTxns.foreach(index.append)
+
+    index.truncateTo(51)
+    assertEquals(abortedTransactions, index.collectAbortedTxns(0L, 100L).abortedTransactions)
+
+    index.truncateTo(50)
+    assertEquals(abortedTransactions.take(3), index.collectAbortedTxns(0L, 100L).abortedTransactions)
+
+    index.truncate()
+    assertEquals(List.empty[AbortedTransaction], index.collectAbortedTxns(0L, 100L).abortedTransactions)
+  }
+
+  @Test
+  def testAbortedTxnSerde(): Unit = {
+    val pid = 983493L
+    val firstOffset = 137L
+    val lastOffset = 299L
+    val lastStableOffset = 200L
+
+    val abortedTxn = new AbortedTxn(pid, firstOffset, lastOffset, lastStableOffset)
+    assertEquals(AbortedTxn.CurrentVersion, abortedTxn.version)
+    assertEquals(pid, abortedTxn.producerId)
+    assertEquals(firstOffset, abortedTxn.firstOffset)
+    assertEquals(lastOffset, abortedTxn.lastOffset)
+    assertEquals(lastStableOffset, abortedTxn.lastStableOffset)
+  }
+
+  @Test
+  def testRenameIndex(): Unit = {
+    val renamed = TestUtils.tempFile()
+    index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2))
+
+    index.renameTo(renamed)
+    index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16))
+
+    val abortedTxns = index.collectAbortedTxns(0L, 100L).abortedTransactions
+    assertEquals(2, abortedTxns.size)
+    assertEquals(0, abortedTxns(0).firstOffset)
+    assertEquals(5, abortedTxns(1).firstOffset)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 5dfcb63..415027c 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -18,9 +18,8 @@
 package kafka.server
 
 import java.io.File
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.AtomicInteger
 import java.util.{Properties, Random}
-import java.lang.{Long => JLong}
 
 import kafka.admin.AdminUtils
 import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo}
@@ -32,13 +31,10 @@ import kafka.utils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{MemoryRecords, Record}
-import org.apache.kafka.common.requests.DeleteRecordsRequest
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.easymock.{EasyMock, IAnswer}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
-import scala.collection.JavaConverters._
 
 class LogOffsetTest extends ZooKeeperTestHarness {
   val random = new Random()
@@ -239,9 +235,9 @@ class LogOffsetTest extends ZooKeeperTestHarness {
   def testFetchOffsetsBeforeWithChangingSegmentSize() {
     val log = EasyMock.niceMock(classOf[Log])
     val logSegment = EasyMock.niceMock(classOf[LogSegment])
-    EasyMock.expect(logSegment.size).andStubAnswer(new IAnswer[Long] {
-      private val value = new AtomicLong(0)
-      def answer: Long = value.getAndIncrement()
+    EasyMock.expect(logSegment.size).andStubAnswer(new IAnswer[Int] {
+      private val value = new AtomicInteger(0)
+      def answer: Int = value.getAndIncrement()
     })
     EasyMock.replay(logSegment)
     val logSegments = Seq(logSegment)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index d6b1649..9f7a47a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -24,10 +24,11 @@ import kafka.log.Log
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.record.{CompressionType, SimpleRecord, MemoryRecords}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.easymock.EasyMock
 import EasyMock._
+import org.apache.kafka.common.requests.IsolationLevel
 import org.junit.Assert._
 import org.junit.{After, Test}
 
@@ -152,14 +153,14 @@ class ReplicaManagerQuotasTest {
     expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes()
 
     //if we ask for len 1 return a message
-    expect(log.read(anyObject(), geq(1), anyObject(), anyObject())).andReturn(
+    expect(log.read(anyObject(), geq(1), anyObject(), anyObject(), anyObject())).andReturn(
       FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.withRecords(CompressionType.NONE, record)
       )).anyTimes()
 
     //if we ask for len = 0, return 0 messages
-    expect(log.read(anyObject(), EasyMock.eq(0), anyObject(), anyObject())).andReturn(
+    expect(log.read(anyObject(), EasyMock.eq(0), anyObject(), anyObject(), anyObject())).andReturn(
       FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.EMPTY

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index e00c142..4886b94 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -21,7 +21,6 @@ import java.io.File
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.cluster.Broker
 import kafka.log.LogConfig
 import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
 import TestUtils.createBroker
@@ -29,7 +28,7 @@ import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState}
+import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest, PartitionState}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.{Node, TopicPartition}
@@ -109,6 +108,7 @@ class ReplicaManagerTest {
         timeout = 0,
         requiredAcks = 3,
         internalTopicsAllowed = false,
+        isFromClient = true,
         entriesPerPartition = Map(new TopicPartition("test1", 0) -> MemoryRecords.withRecords(CompressionType.NONE,
           new SimpleRecord("first message".getBytes))),
         responseCallback = callback)
@@ -166,6 +166,7 @@ class ReplicaManagerTest {
         timeout = 1000,
         requiredAcks = -1,
         internalTopicsAllowed = false,
+        isFromClient = true,
         entriesPerPartition = Map(new TopicPartition(topic, 0) -> MemoryRecords.withRecords(CompressionType.NONE,
           new SimpleRecord("first message".getBytes()))),
         responseCallback = produceCallback)
@@ -178,7 +179,8 @@ class ReplicaManagerTest {
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
         fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
-        responseCallback = fetchCallback)
+        responseCallback = fetchCallback,
+        isolationLevel = IsolationLevel.READ_UNCOMMITTED)
 
       // Make this replica the follower
       val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0,
@@ -192,7 +194,133 @@ class ReplicaManagerTest {
       rm.shutdown(checkpointHW = false)
     }
   }
-  
+
+  @Test
+  def testReadCommittedFetchLimitedAtLSO(): Unit = {
+    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+    props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
+    props.put("broker.id", Int.box(0))
+    val config = KafkaConfig.fromProps(props)
+    val logProps = new Properties()
+    logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray, LogConfig(logProps))
+    val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1))
+    val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+    EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
+    EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(0))).andReturn(true).anyTimes()
+    EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(1))).andReturn(true).anyTimes()
+    EasyMock.replay(metadataCache)
+    val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache, Option(this.getClass.getName))
+
+    try {
+      val brokerList: java.util.List[Integer] = Seq[Integer](0, 1).asJava
+      val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1).asJava
+
+      val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0))
+      partition.getOrCreateReplica(0)
+
+      // Make this replica the leader.
+      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => {})
+      rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
+
+      def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) =
+        responseStatus.values.foreach { status =>
+          assertEquals(Errors.NONE, status.error)
+        }
+
+      val producerId = 234L
+      val epoch = 5.toShort
+
+      // write a few batches as part of a transaction
+      val numRecords = 3
+      for (sequence <- 0 until numRecords) {
+        val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, epoch, sequence,
+          new SimpleRecord(s"message $sequence".getBytes))
+        rm.appendRecords(
+          timeout = 1000,
+          requiredAcks = -1,
+          internalTopicsAllowed = false,
+          isFromClient = true,
+          entriesPerPartition = Map(new TopicPartition(topic, 0) -> records),
+          responseCallback = produceCallback)
+      }
+
+      var fetchCallbackFired = false
+      var fetchError = Errors.NONE
+      var fetchedRecords: Records = null
+      def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = {
+        fetchError = responseStatus.map(_._2).head.error
+        fetchedRecords = responseStatus.map(_._2).head.records
+        fetchCallbackFired = true
+      }
+
+      def fetchMessages(fetchInfos: Seq[(TopicPartition, PartitionData)],
+                        isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Unit = {
+        rm.fetchMessages(
+          timeout = 1000,
+          replicaId = 1,
+          fetchMinBytes = 0,
+          fetchMaxBytes = Int.MaxValue,
+          hardMaxBytesLimit = false,
+          fetchInfos = fetchInfos,
+          responseCallback = fetchCallback,
+          isolationLevel = isolationLevel)
+      }
+
+      // fetch as follower to advance the high watermark
+      fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords, 0, 100000)),
+        isolationLevel = IsolationLevel.READ_UNCOMMITTED)
+
+      // fetch should return empty since LSO should be stuck at 0
+      fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
+        isolationLevel = IsolationLevel.READ_COMMITTED)
+
+      assertTrue(fetchCallbackFired)
+      assertEquals(Errors.NONE, fetchError)
+      assertTrue(fetchedRecords.batches.asScala.isEmpty)
+      fetchCallbackFired = false
+
+      // now commit the transaction
+      val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0)
+      val commitRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker)
+      rm.appendRecords(
+        timeout = 1000,
+        requiredAcks = -1,
+        internalTopicsAllowed = false,
+        isFromClient = false,
+        entriesPerPartition = Map(new TopicPartition(topic, 0) -> commitRecordBatch),
+        responseCallback = produceCallback)
+
+      // the LSO has advanced, but the appended commit marker has not been replicated, so
+      // none of the data from the transaction should be visible yet
+      fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
+        isolationLevel = IsolationLevel.READ_COMMITTED)
+
+      assertTrue(fetchCallbackFired)
+      assertEquals(Errors.NONE, fetchError)
+      assertTrue(fetchedRecords.batches.asScala.isEmpty)
+      fetchCallbackFired = false
+
+      // fetch as follower to advance the high watermark
+      fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords + 1, 0, 100000)),
+        isolationLevel = IsolationLevel.READ_UNCOMMITTED)
+
+      // now all of the records should be fetchable
+      fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
+        isolationLevel = IsolationLevel.READ_COMMITTED)
+
+      assertTrue(fetchCallbackFired)
+      assertEquals(Errors.NONE, fetchError)
+      assertEquals(numRecords + 1, fetchedRecords.batches.asScala.size)
+    } finally {
+      rm.shutdown(checkpointHW = false)
+    }
+  }
+
   @Test
   def testFetchBeyondHighWatermarkReturnEmptyResponse() {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
@@ -211,8 +339,8 @@ class ReplicaManagerTest {
     EasyMock.replay(metadataCache)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache, Option(this.getClass.getName))
+
     try {
-      
       val brokerList: java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava
       val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1, 2).asJava
       
@@ -234,6 +362,7 @@ class ReplicaManagerTest {
           timeout = 1000,
           requiredAcks = -1,
           internalTopicsAllowed = false,
+          isFromClient = true,
           entriesPerPartition = Map(new TopicPartition(topic, 0) -> TestUtils.singletonRecords("message %d".format(i).getBytes)),
           responseCallback = produceCallback)
       
@@ -254,7 +383,8 @@ class ReplicaManagerTest {
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
         fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)),
-        responseCallback = fetchCallback)
+        responseCallback = fetchCallback,
+        isolationLevel = IsolationLevel.READ_UNCOMMITTED)
         
       
       assertTrue(fetchCallbackFired)
@@ -270,11 +400,12 @@ class ReplicaManagerTest {
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
         fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)),
-        responseCallback = fetchCallback)
+        responseCallback = fetchCallback,
+        isolationLevel = IsolationLevel.READ_UNCOMMITTED)
           
-        assertTrue(fetchCallbackFired)
-        assertEquals("Should not give an exception", Errors.NONE, fetchError)
-        assertEquals("Should return empty response", MemoryRecords.EMPTY, fetchedRecords)
+      assertTrue(fetchCallbackFired)
+      assertEquals("Should not give an exception", Errors.NONE, fetchError)
+      assertEquals("Should return empty response", MemoryRecords.EMPTY, fetchedRecords)
     } finally {
       rm.shutdown(checkpointHW = false)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 5e91c9b..9270544 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -49,7 +49,7 @@ class RequestQuotaTest extends BaseRequestTest {
   private var leaderNode: KafkaServer = null
 
   // Run tests concurrently since a throttle could be up to 1 second because quota percentage allocated is very low
-  case class Task(val apiKey: ApiKeys, val future: Future[_])
+  case class Task(apiKey: ApiKeys, future: Future[_])
   private val executor = Executors.newCachedThreadPool
   private val tasks = new ListBuffer[Task]
 
@@ -183,7 +183,8 @@ class RequestQuotaTest extends BaseRequestTest {
           new requests.MetadataRequest.Builder(List(topic).asJava)
 
         case ApiKeys.LIST_OFFSETS =>
-          requests.ListOffsetRequest.Builder.forConsumer(false).setTargetTimes(Map(tp -> (0L: java.lang.Long)).asJava)
+          requests.ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+            .setTargetTimes(Map(tp -> (0L: java.lang.Long)).asJava)
 
         case ApiKeys.LEADER_AND_ISR =>
           new LeaderAndIsrRequest.Builder(brokerId, Int.MaxValue,
@@ -285,7 +286,7 @@ class RequestQuotaTest extends BaseRequestTest {
     apiKey.parseResponse(request.version, responseBuffer)
   }
 
-  case class Client(val clientId: String, val apiKey: ApiKeys) {
+  case class Client(clientId: String, apiKey: ApiKeys) {
     var correlationId: Int = 0
     val builder = requestBuilder(apiKey)
     def runUntil(until: (Struct) => Boolean): Boolean = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index ba17db6..d7822c1 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -28,7 +28,8 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.{CompressionType, SimpleRecord, MemoryRecords}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
+import org.apache.kafka.common.requests.IsolationLevel
 import org.easymock.EasyMock
 import org.junit.Assert._
 
@@ -79,12 +80,12 @@ class SimpleFetchTest {
     EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
     EasyMock.expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(leaderLEO)).anyTimes()
-    EasyMock.expect(log.read(0, fetchSize, Some(partitionHW), true)).andReturn(
+    EasyMock.expect(log.read(0, fetchSize, Some(partitionHW), true, IsolationLevel.READ_UNCOMMITTED)).andReturn(
       FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.withRecords(CompressionType.NONE, recordToHW)
       )).anyTimes()
-    EasyMock.expect(log.read(0, fetchSize, None, true)).andReturn(
+    EasyMock.expect(log.read(0, fetchSize, None, true, IsolationLevel.READ_UNCOMMITTED)).andReturn(
       FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.withRecords(CompressionType.NONE, recordToLEO)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index bedc7bc..5d9e7c1 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -735,7 +735,7 @@ object TestUtils extends Logging {
    * @return The new leader or assertion failure if timeout is reached.
    */
   def waitUntilLeaderIsElectedOrChanged(zkUtils: ZkUtils, topic: String, partition: Int,
-                                        timeoutMs: Long = JTestUtils.DEFAULT_MAX_WAIT_MS,
+                                        timeoutMs: Long = 30000,
                                         oldLeaderOpt: Option[Int] = None, newLeaderOpt: Option[Int] = None): Option[Int] = {
     require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define both the old and the new leader")
     val startTime = System.currentTimeMillis()


Mime
View raw message