kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] kafka git commit: KAFKA-5196; Make LogCleaner transaction-aware
Date Fri, 12 May 2017 19:32:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1c2bbaa50 -> 7baa58d79


http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index b4fe9fb..2283077 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -375,9 +375,10 @@ class LogTest {
                             producerId: Long,
                             epoch: Short,
                             offset: Long = 0L,
-                            coordinatorEpoch: Int = 0): MemoryRecords = {
+                            coordinatorEpoch: Int = 0,
+                            partitionLeaderEpoch: Int = 0): MemoryRecords = {
     val marker = new EndTransactionMarker(controlRecordType, coordinatorEpoch)
-    MemoryRecords.withEndTransactionMarker(offset, producerId, epoch, marker)
+    MemoryRecords.withEndTransactionMarker(offset, time.milliseconds(), partitionLeaderEpoch,
producerId, epoch, marker)
   }
 
   @Test
@@ -2382,22 +2383,22 @@ class LogTest {
 
   private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)
 
-  private def appendTransactionalAsLeader(log: Log, pid: Long, producerEpoch: Short): Int
=> Unit = {
+  private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short):
Int => Unit = {
     var sequence = 0
     numRecords: Int => {
       val simpleRecords = (sequence until sequence + numRecords).map { seq =>
         new SimpleRecord(s"$seq".getBytes)
       }
-      val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, pid,
+      val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId,
         producerEpoch, sequence, simpleRecords: _*)
       log.appendAsLeader(records, leaderEpoch = 0)
       sequence += numRecords
     }
   }
 
-  private def appendEndTxnMarkerAsLeader(log: Log, pid: Long, producerEpoch: Short,
+  private def appendEndTxnMarkerAsLeader(log: Log, producerId: Long, producerEpoch: Short,
                                          controlType: ControlRecordType, coordinatorEpoch:
Int = 0): Unit = {
-    val records = endTxnRecords(controlType, pid, producerEpoch, coordinatorEpoch = coordinatorEpoch)
+    val records = endTxnRecords(controlType, producerId, producerEpoch, coordinatorEpoch
= coordinatorEpoch)
     log.appendAsLeader(records, isFromClient = false, leaderEpoch = 0)
   }
 
@@ -2409,10 +2410,10 @@ class LogTest {
     log.appendAsLeader(records, leaderEpoch = 0)
   }
 
-  private def appendTransactionalToBuffer(buffer: ByteBuffer, pid: Long, epoch: Short): (Long,
Int) => Unit = {
+  private def appendTransactionalToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch:
Short): (Long, Int) => Unit = {
     var sequence = 0
     (offset: Long, numRecords: Int) => {
-      val builder = MemoryRecords.builder(buffer, CompressionType.NONE, offset, pid, epoch,
sequence, true)
+      val builder = MemoryRecords.builder(buffer, CompressionType.NONE, offset, producerId,
producerEpoch, sequence, true)
       for (seq <- sequence until sequence + numRecords) {
         val record = new SimpleRecord(s"$seq".getBytes)
         builder.append(record)
@@ -2424,9 +2425,9 @@ class LogTest {
   }
 
   private def appendEndTxnMarkerToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch:
Short, offset: Long,
-                                    controlType: ControlRecordType, coordinatorEpoch: Int
= 0): Unit = {
+                                         controlType: ControlRecordType, coordinatorEpoch:
Int = 0): Unit = {
     val marker = new EndTransactionMarker(controlType, coordinatorEpoch)
-    MemoryRecords.writeEndTransactionalMarker(buffer, offset, producerId, producerEpoch,
marker)
+    MemoryRecords.writeEndTransactionalMarker(buffer, offset, time.milliseconds(), 0, producerId,
producerEpoch, marker)
   }
 
   private def appendNonTransactionalToBuffer(buffer: ByteBuffer, offset: Long, numRecords:
Int): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
index 4546818..16173eb 100644
--- a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
@@ -85,15 +85,13 @@ class TransactionIndexTest extends JUnitSuite {
 
   @Test
   def testCollectAbortedTransactions(): Unit = {
-    val abortedTxns = List(
+    val abortedTransactions = List(
       new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset
= 11),
       new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset
= 13),
       new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset
= 25),
       new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset
= 40))
 
-    abortedTxns.foreach(index.append)
-
-    val abortedTransactions = abortedTxns.map(_.asAbortedTransaction)
+    abortedTransactions.foreach(index.append)
 
     var result = index.collectAbortedTxns(0L, 100L)
     assertEquals(abortedTransactions, result.abortedTransactions)
@@ -122,14 +120,13 @@ class TransactionIndexTest extends JUnitSuite {
 
   @Test
   def testTruncate(): Unit = {
-    val abortedTxns = List(
+    val abortedTransactions = List(
       new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset
= 2),
       new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset
= 16),
       new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset
= 25),
       new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset
= 40))
-    val abortedTransactions = abortedTxns.map(_.asAbortedTransaction)
 
-    abortedTxns.foreach(index.append)
+    abortedTransactions.foreach(index.append)
 
     index.truncateTo(51)
     assertEquals(abortedTransactions, index.collectAbortedTxns(0L, 100L).abortedTransactions)


Mime
View raw message