Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AEC70200CAC for ; Mon, 5 Jun 2017 01:37:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A256E160BE5; Sun, 4 Jun 2017 23:37:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 80580160BE0 for ; Mon, 5 Jun 2017 01:37:02 +0200 (CEST) Received: (qmail 66170 invoked by uid 500); 4 Jun 2017 23:37:01 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 66161 invoked by uid 99); 4 Jun 2017 23:37:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Jun 2017 23:37:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5E488DFD70; Sun, 4 Jun 2017 23:37:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ijuma@apache.org To: commits@kafka.apache.org Message-Id: <0a3ff445d28e4c69a949e1ba148b66f0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-5355; Test cases to ensure isolation level propagated in delayed fetch Date: Sun, 4 Jun 2017 23:37:01 +0000 (UTC) archived-at: Sun, 04 Jun 2017 23:37:04 -0000 Repository: kafka Updated Branches: refs/heads/trunk c7bc8f7d8 -> 3557f097b KAFKA-5355; Test cases to ensure isolation level propagated in delayed fetch Include a few logging improvements. Author: Jason Gustafson Reviewers: Ismael Juma Closes #3230 from hachikuji/KAFKA-5355-TESTS Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3557f097 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3557f097 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3557f097 Branch: refs/heads/trunk Commit: 3557f097b81edd6518100d79b442223fadf9f81f Parents: c7bc8f7 Author: Jason Gustafson Authored: Mon Jun 5 00:36:16 2017 +0100 Committer: Ismael Juma Committed: Mon Jun 5 00:36:26 2017 +0100 ---------------------------------------------------------------------- .../clients/consumer/internals/Fetcher.java | 19 +- .../producer/internals/TransactionManager.java | 8 + .../common/requests/TxnOffsetCommitRequest.java | 4 + .../group/GroupMetadataManager.scala | 5 +- core/src/main/scala/kafka/log/Log.scala | 9 +- .../main/scala/kafka/server/DelayedFetch.scala | 3 +- .../scala/kafka/server/ReplicaManager.scala | 40 +++- .../kafka/api/TransactionsTest.scala | 108 +++++++--- .../ReplicaFetcherThreadFatalErrorTest.scala | 4 +- .../test/scala/other/kafka/StressTestLog.scala | 3 +- .../group/GroupMetadataManagerTest.scala | 3 +- .../TransactionStateManagerTest.scala | 4 +- .../unit/kafka/log/BrokerCompressionTest.scala | 2 +- .../scala/unit/kafka/log/LogManagerTest.scala | 11 +- .../src/test/scala/unit/kafka/log/LogTest.scala | 56 +++--- .../kafka/server/ReplicaManagerQuotasTest.scala | 20 +- .../unit/kafka/server/ReplicaManagerTest.scala | 197 ++++++++++--------- .../unit/kafka/server/SimpleFetchTest.scala | 26 ++- .../test/scala/unit/kafka/utils/TestUtils.scala | 3 +- 19 files changed, 323 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 9be8aa3..e73ff4e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -198,7 +198,8 @@ public class Fetcher implements SubscriptionState.Listener, Closeable { final FetchRequest.Builder request = fetchEntry.getValue(); final Node fetchTarget = fetchEntry.getKey(); - log.debug("Sending fetch for partitions {} to broker {}", request.fetchData().keySet(), fetchTarget); + log.debug("Sending {} fetch for partitions {} to broker {}", isolationLevel, request.fetchData().keySet(), + fetchTarget); client.send(fetchTarget, request) .addListener(new RequestFutureListener() { @Override @@ -221,9 +222,8 @@ public class Fetcher implements SubscriptionState.Listener, Closeable { long fetchOffset = request.fetchData().get(partition).fetchOffset; FetchResponse.PartitionData fetchData = entry.getValue(); - log.debug("Fetch at offset {} for partition {} returned fetch data {}", fetchOffset, - partition, fetchData); - + log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", + isolationLevel, fetchOffset, partition, fetchData); completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator, resp.requestHeader().apiVersion())); } @@ -782,8 +782,10 @@ public class Fetcher implements SubscriptionState.Listener, Closeable { } long position = this.subscriptions.position(partition); - fetch.put(partition, new FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize)); - log.debug("Added fetch request for partition {} at offset {} to node {}", partition, position, node); + fetch.put(partition, new FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET, + this.fetchSize)); + log.debug("Added {} fetch request for partition {} at offset {} to node {}", isolationLevel, + partition, position, node); } else { log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node); } @@ -1038,8 +1040,9 @@ public class Fetcher implements SubscriptionState.Listener, Closeable { if (containsAbortMarker(currentBatch)) { abortedProducerIds.remove(producerId); } else if (isBatchAborted(currentBatch)) { - log.trace("Skipping aborted record batch with producerId {} and base offset {}, partition {}", - producerId, currentBatch.baseOffset(), partition); + log.debug("Skipping aborted record batch from partition {} with producerId {} and " + + "offsets {} to {}", + partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset()); nextFetchOffset = currentBatch.nextOffset(); continue; } http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index dad6b5d..c081b23 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -212,6 +212,7 @@ public class TransactionManager { throw new KafkaException("Cannot send offsets to transaction either because the producer is not in an " + "active transaction"); + log.debug("{}Begin adding offsets {} for consumer group {} to transaction", logPrefix, offsets, consumerGroupId); AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, consumerGroupId); AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets); @@ -226,6 +227,7 @@ public class TransactionManager { if (partitionsInTransaction.contains(topicPartition)) return; + log.debug("{}Begin adding new partition {} to transaction", logPrefix, topicPartition); newPartitionsInTransaction.add(topicPartition); } @@ -749,6 +751,7 @@ public class TransactionManager { abortableError(new KafkaException("Could not add partitions to transaction due to partition level errors")); } else { Set addedPartitions = errors.keySet(); + log.debug("{}Successfully added partitions {} to transaction", logPrefix, addedPartitions); partitionsInTransaction.addAll(addedPartitions); pendingPartitionsInTransaction.removeAll(addedPartitions); transactionStarted = true; @@ -886,6 +889,9 @@ public class TransactionManager { Errors error = addOffsetsToTxnResponse.error(); if (error == Errors.NONE) { + log.debug("{}Successfully added partition for consumer group {} to transaction", logPrefix, + builder.consumerGroupId()); + // note the result is not completed until the TxnOffsetCommit returns pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId())); transactionStarted = true; @@ -946,6 +952,8 @@ public class TransactionManager { TopicPartition topicPartition = entry.getKey(); Errors error = entry.getValue(); if (error == Errors.NONE) { + log.debug("{}Successfully added offsets {} from consumer group {} to transaction.", logPrefix, + builder.offsets(), builder.consumerGroupId()); pendingTxnOffsetCommits.remove(topicPartition); } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java index 2ea8ecf..20522af 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java @@ -59,6 +59,10 @@ public class TxnOffsetCommitRequest extends AbstractRequest { return consumerGroupId; } + public Map offsets() { + return offsets; + } + @Override public TxnOffsetCommitRequest build(short version) { return new TxnOffsetCommitRequest(version, transactionalId, consumerGroupId, producerId, producerEpoch, offsets); http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index b23514a..db3d936 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -484,9 +484,8 @@ class GroupMetadataManager(brokerId: Int, val removedGroups = mutable.Set[String]() while (currOffset < highWaterMark && !shuttingDown.get()) { - val fetchDataInfo = log.read(currOffset, config.loadBufferSize, maxOffset = None, minOneMessage = true, - isolationLevel = IsolationLevel.READ_UNCOMMITTED) - + val fetchDataInfo = log.read(currOffset, config.loadBufferSize, maxOffset = None, + minOneMessage = true, isolationLevel = IsolationLevel.READ_UNCOMMITTED) val memRecords = fetchDataInfo.records match { case records: MemoryRecords => records case fileRecords: FileRecords => http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index b9968a2..5522508 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -830,6 +830,11 @@ class Log(@volatile var dir: File, } } + private[log] def readUncommitted(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, + minOneMessage: Boolean = false): FetchDataInfo = { + read(startOffset, maxLength, maxOffset, minOneMessage, isolationLevel = IsolationLevel.READ_UNCOMMITTED) + } + /** * Read messages from the log. * @@ -849,7 +854,7 @@ class Log(@volatile var dir: File, * @return The fetch data information including fetch starting offset metadata and messages read. */ def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false, - isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): FetchDataInfo = { + isolationLevel: IsolationLevel): FetchDataInfo = { trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size)) // Because we don't use lock for reading, the synchronization is a little bit tricky. @@ -1001,7 +1006,7 @@ class Log(@volatile var dir: File, */ def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = { try { - val fetchDataInfo = read(offset, 1) + val fetchDataInfo = readUncommitted(offset, 1) fetchDataInfo.fetchOffsetMetadata } catch { case _: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/main/scala/kafka/server/DelayedFetch.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index fb89800..c5cdf57 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -153,8 +153,7 @@ class DelayedFetch(delayMs: Long, hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit, readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo }, quota = quota, - isolationLevel = isolationLevel - ) + isolationLevel = isolationLevel) val fetchPartitionData = logReadResults.map { case (tp, result) => tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records) http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 5e1c9c1..a8d3e94 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -129,7 +129,36 @@ class ReplicaManager(val config: KafkaConfig, quotaManager: ReplicationQuotaManager, val brokerTopicStats: BrokerTopicStats, val metadataCache: MetadataCache, - threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { + val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce], + val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch], + val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords], + threadNamePrefix: Option[String]) extends Logging with KafkaMetricsGroup { + + def this(config: KafkaConfig, + metrics: Metrics, + time: Time, + zkUtils: ZkUtils, + scheduler: Scheduler, + logManager: LogManager, + isShuttingDown: AtomicBoolean, + quotaManager: ReplicationQuotaManager, + brokerTopicStats: BrokerTopicStats, + metadataCache: MetadataCache, + threadNamePrefix: Option[String] = None) { + this(config, metrics, time, zkUtils, scheduler, logManager, isShuttingDown, + quotaManager, brokerTopicStats, metadataCache, + DelayedOperationPurgatory[DelayedProduce]( + purgatoryName = "Produce", brokerId = config.brokerId, + purgeInterval = config.producerPurgatoryPurgeIntervalRequests), + DelayedOperationPurgatory[DelayedFetch]( + purgatoryName = "Fetch", brokerId = config.brokerId, + purgeInterval = config.fetchPurgatoryPurgeIntervalRequests), + DelayedOperationPurgatory[DelayedDeleteRecords]( + purgatoryName = "DeleteRecords", brokerId = config.brokerId, + purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests), + threadNamePrefix) + } + /* epoch of the controller that last changed the leader */ @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 private val localBrokerId = config.brokerId @@ -146,13 +175,6 @@ class ReplicaManager(val config: KafkaConfig, private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis()) private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis()) - val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce]( - purgatoryName = "Produce", brokerId = localBrokerId, purgeInterval = config.producerPurgatoryPurgeIntervalRequests) - val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch]( - purgatoryName = "Fetch", brokerId = localBrokerId, purgeInterval = config.fetchPurgatoryPurgeIntervalRequests) - val delayedDeleteRecordsPurgatory = DelayedOperationPurgatory[DelayedDeleteRecords]( - purgatoryName = "DeleteRecords", brokerId = localBrokerId, purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests) - val leaderCount = newGauge( "LeaderCount", new Gauge[Int] { @@ -641,7 +663,7 @@ class ReplicaManager(val config: KafkaConfig, hardMaxBytesLimit: Boolean, readPartitionInfo: Seq[(TopicPartition, PartitionData)], quota: ReplicaQuota, - isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Seq[(TopicPartition, LogReadResult)] = { + isolationLevel: IsolationLevel): Seq[(TopicPartition, LogReadResult)] = { def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = { val offset = fetchInfo.fetchOffset http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/integration/kafka/api/TransactionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 9aceec8..4593d9c 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -23,8 +23,9 @@ import java.util.concurrent.TimeUnit import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata} -import org.apache.kafka.clients.producer.KafkaProducer +import kafka.utils.TestUtils.consumeRecords +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.ProducerFencedException import org.apache.kafka.common.protocol.SecurityProtocol @@ -32,7 +33,7 @@ import org.junit.{After, Before, Test} import org.junit.Assert._ import scala.collection.JavaConverters._ -import scala.collection.mutable.{ArrayBuffer, Buffer} +import scala.collection.mutable.Buffer import scala.concurrent.ExecutionException class TransactionsTest extends KafkaServerTestHarness { @@ -49,7 +50,7 @@ class TransactionsTest extends KafkaServerTestHarness { val nonTransactionalConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() override def generateConfigs: Seq[KafkaConfig] = { - TestUtils.createBrokerConfigs(numServers, zkConnect, true).map(KafkaConfig.fromProps(_, serverProps())) + TestUtils.createBrokerConfigs(numServers, zkConnect).map(KafkaConfig.fromProps(_, serverProps())) } @Before @@ -62,11 +63,11 @@ class TransactionsTest extends KafkaServerTestHarness { TestUtils.createTopic(zkUtils, topic2, numPartitions, numServers, servers, topicConfig) for (_ <- 0 until transactionalProducerCount) - transactionalProducers += TestUtils.createTransactionalProducer("transactional-producer", servers) + createTransactionalProducer("transactional-producer") for (_ <- 0 until transactionalConsumerCount) - transactionalConsumers += transactionalConsumer("transactional-group") + createReadCommittedConsumer("transactional-group") for (_ <- 0 until nonTransactionalConsumerCount) - nonTransactionalConsumers += nonTransactionalConsumer("non-transactional-group") + createReadUncommittedConsumer("non-transactional-group") } @After @@ -79,9 +80,9 @@ class TransactionsTest extends KafkaServerTestHarness { @Test def testBasicTransactions() = { - val producer = transactionalProducers(0) - val consumer = transactionalConsumers(0) - val unCommittedConsumer = nonTransactionalConsumers(0) + val producer = transactionalProducers.head + val consumer = transactionalConsumers.head + val unCommittedConsumer = nonTransactionalConsumers.head producer.initTransactions() @@ -99,12 +100,12 @@ class TransactionsTest extends KafkaServerTestHarness { consumer.subscribe(List(topic1, topic2).asJava) unCommittedConsumer.subscribe(List(topic1, topic2).asJava) - val records = pollUntilExactlyNumRecords(consumer, 2) + val records = consumeRecords(consumer, 2) records.foreach { record => TestUtils.assertCommittedAndGetValue(record) } - val allRecords = pollUntilExactlyNumRecords(unCommittedConsumer, 4) + val allRecords = consumeRecords(unCommittedConsumer, 4) val expectedValues = List("1", "2", "3", "4").toSet allRecords.foreach { record => assertTrue(expectedValues.contains(TestUtils.recordValueAsString(record))) @@ -112,6 +113,53 @@ class TransactionsTest extends KafkaServerTestHarness { } @Test + def testReadCommittedConsumerShouldNotSeeUndecidedData(): Unit = { + val producer1 = transactionalProducers.head + val producer2 = createTransactionalProducer("other") + val readCommittedConsumer = transactionalConsumers.head + val readUncommittedConsumer = nonTransactionalConsumers.head + + producer1.initTransactions() + producer2.initTransactions() + + producer1.beginTransaction() + producer2.beginTransaction() + producer2.send(new ProducerRecord(topic1, 0, "x".getBytes, "1".getBytes)) + producer2.send(new ProducerRecord(topic2, 0, "x".getBytes, "1".getBytes)) + producer2.flush() + + producer1.send(new ProducerRecord(topic1, 0, "a".getBytes, "1".getBytes)) + producer1.send(new ProducerRecord(topic1, 0, "b".getBytes, "2".getBytes)) + producer1.send(new ProducerRecord(topic2, 0, "c".getBytes, "3".getBytes)) + producer1.send(new ProducerRecord(topic2, 0, "d".getBytes, "4".getBytes)) + producer1.flush() + + producer2.send(new ProducerRecord(topic1, 0, "x".getBytes, "2".getBytes)) + producer2.send(new ProducerRecord(topic2, 0, "x".getBytes, "2".getBytes)) + producer2.commitTransaction() + + // ensure the records are visible to the read uncommitted consumer + readUncommittedConsumer.assign(Set(new TopicPartition(topic1, 0), new TopicPartition(topic2, 0)).asJava) + consumeRecords(readUncommittedConsumer, 8) + readUncommittedConsumer.unsubscribe() + + // we should only see the first two records which come before the undecided second transaction + readCommittedConsumer.assign(Set(new TopicPartition(topic1, 0), new TopicPartition(topic2, 0)).asJava) + val records = consumeRecords(readCommittedConsumer, 2) + records.foreach { record => + assertEquals("x", new String(record.key)) + assertEquals("1", new String(record.value)) + } + + // even if we seek to the end, we should not be able to see the undecided data + assertEquals(2, readCommittedConsumer.assignment.size) + readCommittedConsumer.seekToEnd(readCommittedConsumer.assignment) + readCommittedConsumer.assignment.asScala.foreach { tp => + assertEquals(1L, readCommittedConsumer.position(tp)) + } + } + + @Test def testSendOffsets() = { // The basic plan for the test is as follows: // 1. Seed topic1 with 1000 unique, numbered, messages. @@ -129,7 +177,7 @@ class TransactionsTest extends KafkaServerTestHarness { val producer = transactionalProducers(0) - val consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4) + val consumer = createReadCommittedConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4) consumer.subscribe(List(topic1).asJava) producer.initTransactions() @@ -208,7 +256,7 @@ class TransactionsTest extends KafkaServerTestHarness { producer2.commitTransaction() // ok - val records = pollUntilExactlyNumRecords(consumer, 2) + val records = consumeRecords(consumer, 2) records.foreach { record => TestUtils.assertCommittedAndGetValue(record) } @@ -246,7 +294,7 @@ class TransactionsTest extends KafkaServerTestHarness { producer2.commitTransaction() // ok - val records = pollUntilExactlyNumRecords(consumer, 2) + val records = consumeRecords(consumer, 2) records.foreach { record => TestUtils.assertCommittedAndGetValue(record) } @@ -290,7 +338,7 @@ class TransactionsTest extends KafkaServerTestHarness { producer2.commitTransaction() // ok - val records = pollUntilExactlyNumRecords(consumer, 2) + val records = consumeRecords(consumer, 2) records.foreach { record => TestUtils.assertCommittedAndGetValue(record) } @@ -336,7 +384,7 @@ class TransactionsTest extends KafkaServerTestHarness { producer2.commitTransaction() // ok - val records = pollUntilExactlyNumRecords(consumer, 2) + val records = consumeRecords(consumer, 2) records.foreach { record => TestUtils.assertCommittedAndGetValue(record) } @@ -345,7 +393,7 @@ class TransactionsTest extends KafkaServerTestHarness { private def serverProps() = { val serverProps = new Properties() serverProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString) - // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic) + // Set a smaller value for the number of partitions for the __consumer_offsets topic // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long serverProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString) serverProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString) @@ -354,33 +402,35 @@ class TransactionsTest extends KafkaServerTestHarness { serverProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString) serverProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString) serverProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString) + serverProps.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") serverProps } - private def transactionalConsumer(group: String = "group", maxPollRecords: Int = 500) = { + private def createReadCommittedConsumer(group: String = "group", maxPollRecords: Int = 500) = { val props = new Properties() props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed") props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString) - TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), + val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props)) + transactionalConsumers += consumer + consumer } - private def nonTransactionalConsumer(group: String = "group") = { + private def createReadUncommittedConsumer(group: String = "group") = { val props = new Properties() props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted") props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), + val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props)) + nonTransactionalConsumers += consumer + consumer } - private def pollUntilExactlyNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = { - val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]() - TestUtils.waitUntilTrue(() => { - records ++= consumer.poll(50).asScala - records.size == numRecords - }, s"Consumed ${records.size} records until timeout, but expected $numRecords records.") - records + private def createTransactionalProducer(transactionalId: String): KafkaProducer[Array[Byte], Array[Byte]] = { + val producer = TestUtils.createTransactionalProducer(transactionalId, servers) + transactionalProducers += producer + producer } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala index cd0c74b..147e84a 100644 --- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala +++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala @@ -117,9 +117,9 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness { quotaManager: ReplicationQuotaManager) = new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) { override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { - val prefix = threadNamePrefix.map(tp => s"${tp}:").getOrElse("") + val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("") val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}" - fetcherThread(new FetcherThreadParams(threadName, fetcherId, sourceBroker, replicaManager, metrics, + fetcherThread(FetcherThreadParams(threadName, fetcherId, sourceBroker, replicaManager, metrics, time, quotaManager)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/other/kafka/StressTestLog.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index 7583d45..6c134ac 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -25,6 +25,7 @@ import kafka.server.BrokerTopicStats import kafka.utils._ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException import org.apache.kafka.common.record.FileRecords +import org.apache.kafka.common.requests.IsolationLevel import org.apache.kafka.common.utils.Utils /** @@ -99,7 +100,7 @@ object StressTestLog { @volatile var offset = 0 override def work() { try { - log.read(offset, 1024, Some(offset+1)).records match { + log.read(offset, 1024, Some(offset+1), isolationLevel = IsolationLevel.READ_UNCOMMITTED).records match { case read: FileRecords if read.sizeInBytes > 0 => { val first = read.batches.iterator.next() require(first.lastOffset == offset, "We should either read nothing or the message we asked for.") http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 4c2eb27..6245e85 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -1310,7 +1310,8 @@ class GroupMetadataManagerTest { EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock)) EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset) EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset)) - EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED))) + EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), + EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED))) .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock)) EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt())) .andReturn(records.buffer) http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 54246c4..b5d7903 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -517,8 +517,8 @@ class TransactionStateManagerTest { EasyMock.expect(replicaManager.getLogEndOffset(topicPartition)).andStubReturn(Some(endOffset)) EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset) - EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true), - EasyMock.eq(IsolationLevel.READ_UNCOMMITTED))) + EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), + EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED))) .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock)) EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt())) .andReturn(records.buffer) http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala index ee46341..3d19442 100755 --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -62,7 +62,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin log.appendAsLeader(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec), 0, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0) - def readBatch(offset: Int) = log.read(offset, 4096).records.batches.iterator.next() + def readBatch(offset: Int) = log.readUncommitted(offset, 4096).records.batches.iterator.next() if (!brokerCompression.equals("producer")) { val brokerCompressionCode = BrokerCompressionCodec.getCompressionCodec(brokerCompression) http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/log/LogManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index a6fe2e4..5b29471 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -21,10 +21,12 @@ import java.io._ import java.util.Properties import kafka.common._ +import kafka.server.FetchDataInfo import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.OffsetOutOfRangeException +import org.apache.kafka.common.requests.IsolationLevel import org.apache.kafka.common.utils.Utils import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -105,10 +107,10 @@ class LogManagerTest { // there should be a log file, two indexes, and the leader epoch checkpoint assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 1, log.dir.list.length) - assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).records.sizeInBytes) + assertEquals("Should get empty fetch off new log.", 0, log.readUncommitted(offset+1, 1024).records.sizeInBytes) try { - log.read(0, 1024) + log.readUncommitted(0, 1024) fail("Should get exception from fetching earlier.") } catch { case _: OffsetOutOfRangeException => // This is good. @@ -154,9 +156,9 @@ class LogManagerTest { // there should be a log file, two indexes (the txn index is created lazily), // the leader epoch checkpoint and two pid mapping files (one for the active and previous segments) assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 3, log.dir.list.length) - assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).records.sizeInBytes) + assertEquals("Should get empty fetch off new log.", 0, log.readUncommitted(offset + 1, 1024).records.sizeInBytes) try { - log.read(0, 1024) + log.readUncommitted(0, 1024) fail("Should get exception from fetching earlier.") } catch { case _: OffsetOutOfRangeException => // This is good. @@ -302,7 +304,6 @@ class LogManagerTest { } } - private def createLogManager(logDirs: Array[File] = Array(this.logDir)): LogManager = { TestUtils.createLogManager( defaultConfig = logConfig, http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 6fcc7ae..7b67857 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -551,7 +551,7 @@ class LogTest { log.appendAsFollower(memoryRecords) log.flush() - val fetchedData = log.read(0, Int.MaxValue) + val fetchedData = log.readUncommitted(0, Int.MaxValue) val origIterator = memoryRecords.batches.iterator() for (batch <- fetchedData.records.batches.asScala) { @@ -736,13 +736,14 @@ class LogTest { log.appendAsLeader(TestUtils.singletonRecords(value = value), leaderEpoch = 0) for(i <- values.indices) { - val read = log.read(i, 100, Some(i+1)).records.batches.iterator.next() + val read = log.readUncommitted(i, 100, Some(i+1)).records.batches.iterator.next() assertEquals("Offset read should match order appended.", i, read.lastOffset) val actual = read.iterator.next() assertNull("Key should be null", actual.key) assertEquals("Values not equal", ByteBuffer.wrap(values(i)), actual.value) } - assertEquals("Reading beyond the last message returns nothing.", 0, log.read(values.length, 100, None).records.batches.asScala.size) + assertEquals("Reading beyond the last message returns nothing.", 0, + log.readUncommitted(values.length, 100, None).records.batches.asScala.size) } /** @@ -763,7 +764,7 @@ class LogTest { log.appendAsFollower(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, 0, records(i))) for(i <- 50 until messageIds.max) { val idx = messageIds.indexWhere(_ >= i) - val read = log.read(i, 100, None).records.records.iterator.next() + val read = log.readUncommitted(i, 100, None).records.records.iterator.next() assertEquals("Offset read should match message id.", messageIds(idx), read.offset) assertEquals("Message should match appended.", records(idx), new SimpleRecord(read)) } @@ -790,7 +791,7 @@ class LogTest { log.logSegments.head.truncateTo(1) assertEquals("A read should now return the last message in the log", log.logEndOffset - 1, - log.read(1, 200, None).records.batches.iterator.next().lastOffset) + log.readUncommitted(1, 200, None).records.batches.iterator.next().lastOffset) } @Test @@ -809,16 +810,16 @@ class LogTest { for (i <- 50 until messageIds.max) { val idx = messageIds.indexWhere(_ >= i) val reads = Seq( - log.read(i, 1, minOneMessage = true), - log.read(i, 100, minOneMessage = true), - log.read(i, 100, Some(10000), minOneMessage = true) + log.readUncommitted(i, 1, minOneMessage = true), + log.readUncommitted(i, 100, minOneMessage = true), + log.readUncommitted(i, 100, Some(10000), minOneMessage = true) ).map(_.records.records.iterator.next()) reads.foreach { read => assertEquals("Offset read should match message id.", messageIds(idx), read.offset) assertEquals("Message should match appended.", records(idx), new SimpleRecord(read)) } - assertEquals(Seq.empty, log.read(i, 1, Some(1), minOneMessage = true).records.batches.asScala.toIndexedSeq) + assertEquals(Seq.empty, log.readUncommitted(i, 1, Some(1), minOneMessage = true).records.batches.asScala.toIndexedSeq) } } @@ -836,14 +837,14 @@ class LogTest { log.appendAsFollower(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, 0, records(i))) for (i <- 50 until messageIds.max) { - assertEquals(MemoryRecords.EMPTY, log.read(i, 0).records) + assertEquals(MemoryRecords.EMPTY, log.readUncommitted(i, 0).records) // we return an incomplete message instead of an empty one for the case below // we use this mechanism to tell consumers of the fetch request version 2 and below that the message size is // larger than the fetch size // in fetch request version 3, we no longer need this as we return oversized messages from the first non-empty // partition - val fetchInfo = log.read(i, 1) + val fetchInfo = log.readUncommitted(i, 1) assertTrue(fetchInfo.firstEntryIncomplete) assertTrue(fetchInfo.records.isInstanceOf[FileRecords]) assertEquals(1, fetchInfo.records.sizeInBytes) @@ -867,23 +868,25 @@ class LogTest { brokerTopicStats = brokerTopicStats, time = time) log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), leaderEpoch = 0) - assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).records.sizeInBytes) + assertEquals("Reading at the log end offset should produce 0 byte read.", 0, + log.readUncommitted(1025, 1000).records.sizeInBytes) try { - log.read(0, 1000) + log.readUncommitted(0, 1000) fail("Reading below the log start offset should throw OffsetOutOfRangeException") } catch { case _: OffsetOutOfRangeException => // This is good. } try { - log.read(1026, 1000) + log.readUncommitted(1026, 1000) fail("Reading at beyond the log end offset should throw OffsetOutOfRangeException") } catch { case _: OffsetOutOfRangeException => // This is good. } - assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0, log.read(1025, 1000, Some(1024)).records.sizeInBytes) + assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0, + log.readUncommitted(1025, 1000, Some(1024)).records.sizeInBytes) } /** @@ -906,7 +909,7 @@ class LogTest { /* do successive reads to ensure all our messages are there */ var offset = 0L for(i <- 0 until numMessages) { - val messages = log.read(offset, 1024*1024).records.batches + val messages = log.readUncommitted(offset, 1024*1024).records.batches val head = messages.iterator.next() assertEquals("Offsets not equal", offset, head.lastOffset) @@ -917,7 +920,8 @@ class LogTest { assertEquals(s"Timestamps not equal at offset $offset", expected.timestamp, actual.timestamp) offset = head.lastOffset + 1 } - val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).records + val lastRead = log.readUncommitted(startOffset = numMessages, maxLength = 1024*1024, + maxOffset = Some(numMessages + 1)).records assertEquals("Should be no more messages", 0, lastRead.records.asScala.size) // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure @@ -941,7 +945,7 @@ class LogTest { log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0) log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("alpha".getBytes), new SimpleRecord("beta".getBytes)), leaderEpoch = 0) - def read(offset: Int) = log.read(offset, 4096).records.records + def read(offset: Int) = log.readUncommitted(offset, 4096).records.records /* we should always get the first message in the compressed set when reading any offset in the set */ assertEquals("Read at offset 0 should produce 0", 0, read(0).iterator.next().offset) @@ -1152,7 +1156,7 @@ class LogTest { val messages = (0 until numMessages).map { i => MemoryRecords.withRecords(100 + i, CompressionType.NONE, 0, new SimpleRecord(time.milliseconds + i, i.toString.getBytes())) } - messages.foreach(log.appendAsFollower(_)) + messages.foreach(log.appendAsFollower) val timeIndexEntries = log.logSegments.foldLeft(0) { (entries, segment) => entries + segment.timeIndex.entries } assertEquals(s"There should be ${numMessages - 1} time index entries", numMessages - 1, timeIndexEntries) assertEquals(s"The last time index entry should have timestamp ${time.milliseconds + numMessages - 1}", @@ -1190,7 +1194,7 @@ class LogTest { assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0) assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0) for(i <- 0 until numMessages) { - assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset) + assertEquals(i, log.readUncommitted(i, 100, None).records.batches.iterator.next().lastOffset) if (i == 0) assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset) else @@ -1215,7 +1219,8 @@ class LogTest { var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, brokerTopicStats = brokerTopicStats, time = time) for(i <- 0 until numMessages) - log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0) + log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), + timestamp = time.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0) val timeIndexFiles = log.logSegments.map(_.timeIndex.file) log.close() @@ -1226,11 +1231,10 @@ class LogTest { log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, scheduler = time.scheduler, brokerTopicStats = brokerTopicStats, time = time) val segArray = log.logSegments.toArray - for (i <- 0 until segArray.size - 1) { + for (i <- segArray.indices.init) { assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries) assertEquals("The time index file size should be 0", 0, segArray(i).timeIndex.file.length) } - } /** @@ -1272,7 +1276,7 @@ class LogTest { brokerTopicStats = brokerTopicStats, time = time) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) for(i <- 0 until numMessages) { - assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset) + assertEquals(i, log.readUncommitted(i, 100, None).records.batches.iterator.next().lastOffset) if (i == 0) assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset) else @@ -1544,7 +1548,7 @@ class LogTest { brokerTopicStats = brokerTopicStats, time = time) log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0) - val head = log.read(0, 4096, None).records.records.iterator.next() + val head = log.readUncommitted(0, 4096, None).records.records.iterator.next() assertEquals(0, head.offset) assertTrue("Message payload should be null.", !head.hasValue) } @@ -2032,7 +2036,7 @@ class LogTest { //Then leader epoch should be set on messages for (i <- records.indices) { - val read = log.read(i, 100, Some(i+1)).records.batches.iterator.next() + val read = log.readUncommitted(i, 100, Some(i+1)).records.batches.iterator.next() assertEquals("Should have set leader epoch", 72, read.partitionLeaderEpoch) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index e770106..2ee08a2 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -42,7 +42,7 @@ class ReplicaManagerQuotasTest { val topicPartition1 = new TopicPartition("test-topic", 1) val topicPartition2 = new TopicPartition("test-topic", 2) val fetchInfo = Seq(topicPartition1 -> new PartitionData(0, 0, 100), topicPartition2 -> new PartitionData(0, 0, 100)) - var replicaManager: ReplicaManager = null + var replicaManager: ReplicaManager = _ @Test def shouldExcludeSubsequentThrottledPartitions(): Unit = { @@ -61,7 +61,8 @@ class ReplicaManagerQuotasTest { fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, readPartitionInfo = fetchInfo, - quota = quota) + quota = quota, + isolationLevel = IsolationLevel.READ_UNCOMMITTED) assertEquals("Given two partitions, with only one throttled, we should get the first", 1, fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size) @@ -86,7 +87,8 @@ class ReplicaManagerQuotasTest { fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, readPartitionInfo = fetchInfo, - quota = quota) + quota = quota, + isolationLevel = IsolationLevel.READ_UNCOMMITTED) assertEquals("Given two partitions, with both throttled, we should get no messages", 0, fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size) assertEquals("Given two partitions, with both throttled, we should get no messages", 0, @@ -110,7 +112,8 @@ class ReplicaManagerQuotasTest { fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, readPartitionInfo = fetchInfo, - quota = quota) + quota = quota, + isolationLevel = IsolationLevel.READ_UNCOMMITTED) assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1, fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size) assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1, @@ -134,7 +137,8 @@ class ReplicaManagerQuotasTest { fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, readPartitionInfo = fetchInfo, - quota = quota) + quota = quota, + isolationLevel = IsolationLevel.READ_UNCOMMITTED) assertEquals("Given two partitions, with only one throttled, we should get the first", 1, fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size) @@ -153,14 +157,16 @@ class ReplicaManagerQuotasTest { expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes() //if we ask for len 1 return a message - expect(log.read(anyObject(), geq(1), anyObject(), anyObject(), anyObject())).andReturn( + expect(log.read(anyObject(), geq(1), anyObject(), anyObject(), + EasyMock.eq(IsolationLevel.READ_UNCOMMITTED))).andReturn( FetchDataInfo( new LogOffsetMetadata(0L, 0L, 0), MemoryRecords.withRecords(CompressionType.NONE, record) )).anyTimes() //if we ask for len = 0, return 0 messages - expect(log.read(anyObject(), EasyMock.eq(0), anyObject(), anyObject(), anyObject())).andReturn( + expect(log.read(anyObject(), EasyMock.eq(0), anyObject(), anyObject(), + EasyMock.eq(IsolationLevel.READ_UNCOMMITTED))).andReturn( FetchDataInfo( new LogOffsetMetadata(0L, 0L, 0), MemoryRecords.EMPTY http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 6efd0a3..a33968a 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean import kafka.log.LogConfig import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils} import TestUtils.createBroker +import kafka.utils.timer.MockTimer import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors @@ -165,26 +166,12 @@ class ReplicaManagerTest { rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => {}) rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0)) - // Append a message. - rm.appendRecords( - timeout = 1000, - requiredAcks = -1, - internalTopicsAllowed = false, - isFromClient = true, - entriesPerPartition = Map(new TopicPartition(topic, 0) -> MemoryRecords.withRecords(CompressionType.NONE, - new SimpleRecord("first message".getBytes()))), - responseCallback = produceCallback) + val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes())) + appendRecords(rm, Map(new TopicPartition(topic, 0) -> records), produceCallback) // Fetch some messages - rm.fetchMessages( - timeout = 1000, - replicaId = -1, - fetchMinBytes = 100000, - fetchMaxBytes = Int.MaxValue, - hardMaxBytesLimit = false, - fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), - responseCallback = fetchCallback, - isolationLevel = IsolationLevel.READ_UNCOMMITTED) + fetchAsConsumer(rm, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), fetchCallback, + minBytes = 100000) // Make this replica the follower val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0, @@ -214,23 +201,32 @@ class ReplicaManagerTest { EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(0))).andReturn(true).anyTimes() EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(1))).andReturn(true).anyTimes() EasyMock.replay(metadataCache) - val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, + + val timer = new MockTimer + val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce]( + purgatoryName = "Produce", timer, reaperEnabled = false) + val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( + purgatoryName = "Fetch", timer, reaperEnabled = false) + val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords]( + purgatoryName = "DeleteRecords", timer, reaperEnabled = false) + + val replicaManager = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, - metadataCache, Option(this.getClass.getName)) + metadataCache, mockProducePurgatory, mockFetchPurgatory, mockDeleteRecordsPurgatory, Option(this.getClass.getName)) try { val brokerList: java.util.List[Integer] = Seq[Integer](0, 1).asJava val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1).asJava - val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0)) + val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0)) partition.getOrCreateReplica(0) // Make this replica the leader. val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => {}) - rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0)) + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => {}) + replicaManager.getLeaderReplicaIfLocal(new TopicPartition(topic, 0)) def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = responseStatus.values.foreach { status => @@ -245,13 +241,7 @@ class ReplicaManagerTest { for (sequence <- 0 until numRecords) { val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, epoch, sequence, new SimpleRecord(s"message $sequence".getBytes)) - rm.appendRecords( - timeout = 1000, - requiredAcks = -1, - internalTopicsAllowed = false, - isFromClient = true, - entriesPerPartition = Map(new TopicPartition(topic, 0) -> records), - responseCallback = produceCallback) + appendRecords(replicaManager, Map(new TopicPartition(topic, 0) -> records), produceCallback) } var fetchCallbackFired = false @@ -263,26 +253,22 @@ class ReplicaManagerTest { fetchCallbackFired = true } - def fetchMessages(fetchInfos: Seq[(TopicPartition, PartitionData)], - isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Unit = { - rm.fetchMessages( - timeout = 1000, - replicaId = 1, - fetchMinBytes = 0, - fetchMaxBytes = Int.MaxValue, - hardMaxBytesLimit = false, - fetchInfos = fetchInfos, - responseCallback = fetchCallback, - isolationLevel = isolationLevel) - } - // fetch as follower to advance the high watermark - fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords, 0, 100000)), - isolationLevel = IsolationLevel.READ_UNCOMMITTED) + fetchAsFollower(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords, 0, 100000)), + fetchCallback, isolationLevel = IsolationLevel.READ_UNCOMMITTED) // fetch should return empty since LSO should be stuck at 0 - fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), - isolationLevel = IsolationLevel.READ_COMMITTED) + fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), + fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED) + assertTrue(fetchCallbackFired) + assertEquals(Errors.NONE, fetchError) + assertTrue(fetchedRecords.batches.asScala.isEmpty) + fetchCallbackFired = false + + // delayed fetch should timeout and return nothing + fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), + fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED, minBytes = 1000) + timer.advanceClock(1001) assertTrue(fetchCallbackFired) assertEquals(Errors.NONE, fetchError) @@ -292,18 +278,13 @@ class ReplicaManagerTest { // now commit the transaction val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0) val commitRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker) - rm.appendRecords( - timeout = 1000, - requiredAcks = -1, - internalTopicsAllowed = false, - isFromClient = false, - entriesPerPartition = Map(new TopicPartition(topic, 0) -> commitRecordBatch), - responseCallback = produceCallback) + appendRecords(replicaManager, Map(new TopicPartition(topic, 0) -> commitRecordBatch), produceCallback, + isFromClient = false) // the LSO has advanced, but the appended commit marker has not been replicated, so // none of the data from the transaction should be visible yet - fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), - isolationLevel = IsolationLevel.READ_COMMITTED) + fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), + fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED) assertTrue(fetchCallbackFired) assertEquals(Errors.NONE, fetchError) @@ -311,18 +292,18 @@ class ReplicaManagerTest { fetchCallbackFired = false // fetch as follower to advance the high watermark - fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords + 1, 0, 100000)), - isolationLevel = IsolationLevel.READ_UNCOMMITTED) + fetchAsFollower(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords + 1, 0, 100000)), + fetchCallback, isolationLevel = IsolationLevel.READ_UNCOMMITTED) // now all of the records should be fetchable - fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), - isolationLevel = IsolationLevel.READ_COMMITTED) + fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), + fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED) assertTrue(fetchCallbackFired) assertEquals(Errors.NONE, fetchError) assertEquals(numRecords + 1, fetchedRecords.batches.asScala.size) } finally { - rm.shutdown(checkpointHW = false) + replicaManager.shutdown(checkpointHW = false) } } @@ -349,10 +330,10 @@ class ReplicaManagerTest { try { val brokerList: java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1, 2).asJava - + val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0)) partition.getOrCreateReplica(0) - + // Make this replica the leader. val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava, @@ -361,17 +342,13 @@ class ReplicaManagerTest { rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0)) def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = {} - + // Append a couple of messages. - for(i <- 1 to 2) - rm.appendRecords( - timeout = 1000, - requiredAcks = -1, - internalTopicsAllowed = false, - isFromClient = true, - entriesPerPartition = Map(new TopicPartition(topic, 0) -> TestUtils.singletonRecords("message %d".format(i).getBytes)), - responseCallback = produceCallback) - + for(i <- 1 to 2) { + val records = TestUtils.singletonRecords(s"message $i".getBytes) + appendRecords(rm, Map(new TopicPartition(topic, 0) -> records), produceCallback) + } + var fetchCallbackFired = false var fetchError = Errors.NONE var fetchedRecords: Records = null @@ -380,35 +357,16 @@ class ReplicaManagerTest { fetchedRecords = responseStatus.map(_._2).head.records fetchCallbackFired = true } - + // Fetch a message above the high watermark as a follower - rm.fetchMessages( - timeout = 1000, - replicaId = 1, - fetchMinBytes = 0, - fetchMaxBytes = Int.MaxValue, - hardMaxBytesLimit = false, - fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)), - responseCallback = fetchCallback, - isolationLevel = IsolationLevel.READ_UNCOMMITTED) - - + fetchAsFollower(rm, Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)), fetchCallback) assertTrue(fetchCallbackFired) assertEquals("Should not give an exception", Errors.NONE, fetchError) assertTrue("Should return some data", fetchedRecords.batches.iterator.hasNext) fetchCallbackFired = false - + // Fetch a message above the high watermark as a consumer - rm.fetchMessages( - timeout = 1000, - replicaId = -1, - fetchMinBytes = 0, - fetchMaxBytes = Int.MaxValue, - hardMaxBytesLimit = false, - fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)), - responseCallback = fetchCallback, - isolationLevel = IsolationLevel.READ_UNCOMMITTED) - + fetchAsConsumer(rm, Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)), fetchCallback) assertTrue(fetchCallbackFired) assertEquals("Should not give an exception", Errors.NONE, fetchError) assertEquals("Should return empty response", MemoryRecords.EMPTY, fetchedRecords) @@ -416,4 +374,51 @@ class ReplicaManagerTest { rm.shutdown(checkpointHW = false) } } + + private def appendRecords(replicaManager: ReplicaManager, + entriesPerPartition: Map[TopicPartition, MemoryRecords], + responseCallback: Map[TopicPartition, PartitionResponse] => Unit, + isFromClient: Boolean = true): Unit = { + replicaManager.appendRecords( + timeout = 1000, + requiredAcks = -1, + internalTopicsAllowed = false, + isFromClient = isFromClient, + entriesPerPartition = entriesPerPartition, + responseCallback = responseCallback) + } + + private def fetchAsConsumer(replicaManager: ReplicaManager, + fetchInfos: Seq[(TopicPartition, PartitionData)], + fetchCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit, + minBytes: Int = 0, + isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Unit = { + fetchMessages(replicaManager, replicaId = -1, fetchInfos, fetchCallback, minBytes, isolationLevel) + } + + private def fetchAsFollower(replicaManager: ReplicaManager, + fetchInfos: Seq[(TopicPartition, PartitionData)], + fetchCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit, + minBytes: Int = 0, + isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Unit = { + fetchMessages(replicaManager, replicaId = 1, fetchInfos, fetchCallback, minBytes, isolationLevel) + } + + private def fetchMessages(replicaManager: ReplicaManager, + replicaId: Int, + fetchInfos: Seq[(TopicPartition, PartitionData)], + fetchCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit, + minBytes: Int, + isolationLevel: IsolationLevel): Unit = { + replicaManager.fetchMessages( + timeout = 1000, + replicaId = replicaId, + fetchMinBytes = minBytes, + fetchMaxBytes = Int.MaxValue, + hardMaxBytesLimit = false, + fetchInfos = fetchInfos, + responseCallback = fetchCallback, + isolationLevel = isolationLevel) + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index ac851d8..dad4b78 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -62,7 +62,7 @@ class SimpleFetchTest { val fetchInfo = Seq(topicPartition -> new PartitionData(0, 0, fetchSize)) - var replicaManager: ReplicaManager = null + var replicaManager: ReplicaManager = _ @Before def setUp() { @@ -80,13 +80,23 @@ class SimpleFetchTest { EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes() EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes() EasyMock.expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(leaderLEO)).anyTimes() - EasyMock.expect(log.read(0, fetchSize, Some(partitionHW), true, IsolationLevel.READ_UNCOMMITTED)).andReturn( - FetchDataInfo( + EasyMock.expect(log.read( + startOffset = 0, + maxLength = fetchSize, + maxOffset = Some(partitionHW), + minOneMessage = true, + isolationLevel = IsolationLevel.READ_UNCOMMITTED)) + .andReturn(FetchDataInfo( new LogOffsetMetadata(0L, 0L, 0), MemoryRecords.withRecords(CompressionType.NONE, recordToHW) )).anyTimes() - EasyMock.expect(log.read(0, fetchSize, None, true, IsolationLevel.READ_UNCOMMITTED)).andReturn( - FetchDataInfo( + EasyMock.expect(log.read( + startOffset = 0, + maxLength = fetchSize, + maxOffset = None, + minOneMessage = true, + isolationLevel = IsolationLevel.READ_UNCOMMITTED)) + .andReturn(FetchDataInfo( new LogOffsetMetadata(0L, 0L, 0), MemoryRecords.withRecords(CompressionType.NONE, recordToLEO) )).anyTimes() @@ -162,7 +172,8 @@ class SimpleFetchTest { fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, readPartitionInfo = fetchInfo, - quota = UnboundedQuota).find(_._1 == topicPartition) + quota = UnboundedQuota, + isolationLevel = IsolationLevel.READ_UNCOMMITTED).find(_._1 == topicPartition) val firstReadRecord = readCommittedRecords.get._2.info.records.records.iterator.next() assertEquals("Reading committed data should return messages only up to high watermark", recordToHW, new SimpleRecord(firstReadRecord)) @@ -174,7 +185,8 @@ class SimpleFetchTest { fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, readPartitionInfo = fetchInfo, - quota = UnboundedQuota).find(_._1 == topicPartition) + quota = UnboundedQuota, + isolationLevel = IsolationLevel.READ_UNCOMMITTED).find(_._1 == topicPartition) val firstRecord = readAllRecords.get._2.info.records.records.iterator.next() assertEquals("Reading any data can return messages up to the end of the log", recordToLEO, http://git-wip-us.apache.org/repos/asf/kafka/blob/3557f097/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index aae58cc..60b18d2 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -833,7 +833,8 @@ object TestUtils extends Logging { /** * Wait until the given condition is true or throw an exception if the given wait time elapses. */ - def waitUntilTrue(condition: () => Boolean, msg: String, waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L): Boolean = { + def waitUntilTrue(condition: () => Boolean, msg: => String, + waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L): Boolean = { val startTime = System.currentTimeMillis() while (true) { if (condition())