Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2B818200C72 for ; Fri, 12 May 2017 21:32:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 29F4E160BB8; Fri, 12 May 2017 19:32:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 72341160BA8 for ; Fri, 12 May 2017 21:32:53 +0200 (CEST) Received: (qmail 89211 invoked by uid 500); 12 May 2017 19:32:52 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 89202 invoked by uid 99); 12 May 2017 19:32:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 May 2017 19:32:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7CC6DDFD70; Fri, 12 May 2017 19:32:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jgus@apache.org To: commits@kafka.apache.org Date: Fri, 12 May 2017 19:32:52 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] kafka git commit: KAFKA-5196; Make LogCleaner transaction-aware archived-at: Fri, 12 May 2017 19:32:54 -0000 Repository: kafka Updated Branches: refs/heads/trunk 1c2bbaa50 -> 7baa58d79 http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index b4fe9fb..2283077 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -375,9 +375,10 @@ class LogTest { producerId: Long, epoch: Short, offset: Long = 0L, - coordinatorEpoch: Int = 0): MemoryRecords = { + coordinatorEpoch: Int = 0, + partitionLeaderEpoch: Int = 0): MemoryRecords = { val marker = new EndTransactionMarker(controlRecordType, coordinatorEpoch) - MemoryRecords.withEndTransactionMarker(offset, producerId, epoch, marker) + MemoryRecords.withEndTransactionMarker(offset, time.milliseconds(), partitionLeaderEpoch, producerId, epoch, marker) } @Test @@ -2382,22 +2383,22 @@ class LogTest { private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns) - private def appendTransactionalAsLeader(log: Log, pid: Long, producerEpoch: Short): Int => Unit = { + private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short): Int => Unit = { var sequence = 0 numRecords: Int => { val simpleRecords = (sequence until sequence + numRecords).map { seq => new SimpleRecord(s"$seq".getBytes) } - val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, pid, + val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords: _*) log.appendAsLeader(records, leaderEpoch = 0) sequence += numRecords } } - private def appendEndTxnMarkerAsLeader(log: Log, pid: Long, producerEpoch: Short, + private def appendEndTxnMarkerAsLeader(log: Log, producerId: Long, producerEpoch: Short, controlType: ControlRecordType, coordinatorEpoch: Int = 0): Unit = { - val records = endTxnRecords(controlType, pid, producerEpoch, coordinatorEpoch = coordinatorEpoch) + val records = endTxnRecords(controlType, producerId, producerEpoch, coordinatorEpoch = coordinatorEpoch) log.appendAsLeader(records, isFromClient = false, leaderEpoch = 0) } @@ -2409,10 +2410,10 @@ class LogTest { log.appendAsLeader(records, leaderEpoch = 0) } - private def appendTransactionalToBuffer(buffer: ByteBuffer, pid: Long, epoch: Short): (Long, Int) => Unit = { + private def appendTransactionalToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch: Short): (Long, Int) => Unit = { var sequence = 0 (offset: Long, numRecords: Int) => { - val builder = MemoryRecords.builder(buffer, CompressionType.NONE, offset, pid, epoch, sequence, true) + val builder = MemoryRecords.builder(buffer, CompressionType.NONE, offset, producerId, producerEpoch, sequence, true) for (seq <- sequence until sequence + numRecords) { val record = new SimpleRecord(s"$seq".getBytes) builder.append(record) @@ -2424,9 +2425,9 @@ class LogTest { } private def appendEndTxnMarkerToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch: Short, offset: Long, - controlType: ControlRecordType, coordinatorEpoch: Int = 0): Unit = { + controlType: ControlRecordType, coordinatorEpoch: Int = 0): Unit = { val marker = new EndTransactionMarker(controlType, coordinatorEpoch) - MemoryRecords.writeEndTransactionalMarker(buffer, offset, producerId, producerEpoch, marker) + MemoryRecords.writeEndTransactionalMarker(buffer, offset, time.milliseconds(), 0, producerId, producerEpoch, marker) } private def appendNonTransactionalToBuffer(buffer: ByteBuffer, offset: Long, numRecords: Int): Unit = { http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala index 4546818..16173eb 100644 --- a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala @@ -85,15 +85,13 @@ class TransactionIndexTest extends JUnitSuite { @Test def testCollectAbortedTransactions(): Unit = { - val abortedTxns = List( + val abortedTransactions = List( new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11), new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13), new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25), new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40)) - abortedTxns.foreach(index.append) - - val abortedTransactions = abortedTxns.map(_.asAbortedTransaction) + abortedTransactions.foreach(index.append) var result = index.collectAbortedTxns(0L, 100L) assertEquals(abortedTransactions, result.abortedTransactions) @@ -122,14 +120,13 @@ class TransactionIndexTest extends JUnitSuite { @Test def testTruncate(): Unit = { - val abortedTxns = List( + val abortedTransactions = List( new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2), new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16), new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25), new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40)) - val abortedTransactions = abortedTxns.map(_.asAbortedTransaction) - abortedTxns.foreach(index.append) + abortedTransactions.foreach(index.append) index.truncateTo(51) assertEquals(abortedTransactions, index.collectAbortedTxns(0L, 100L).abortedTransactions)