From commits-return-9765-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Thu Jun 14 20:41:43 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 4A3FF180600 for ; Thu, 14 Jun 2018 20:41:41 +0200 (CEST) Received: (qmail 5815 invoked by uid 500); 14 Jun 2018 18:41:40 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 5806 invoked by uid 99); 14 Jun 2018 18:41:40 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Jun 2018 18:41:40 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 88E8485343; Thu, 14 Jun 2018 18:41:39 +0000 (UTC) Date: Thu, 14 Jun 2018 18:41:39 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 2.0 updated: MINOR: Use ListOffsets request instead of SimpleConsumer in LogOffsetTest (#5227) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152900169864.28106.9507347517606490451@gitbox.apache.org> From: ijuma@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/2.0 X-Git-Reftype: branch X-Git-Oldrev: ace626c0b325093ea68decd6b0e7f3c0f5070cfd X-Git-Newrev: 040eab6f13beb356b35afe2bd5be8e61fee2cb3a X-Git-Rev: 040eab6f13beb356b35afe2bd5be8e61fee2cb3a X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.0 by this push: new 040eab6 MINOR: Use ListOffsets request instead of SimpleConsumer in LogOffsetTest (#5227) 040eab6 is described below commit 040eab6f13beb356b35afe2bd5be8e61fee2cb3a Author: Ismael Juma AuthorDate: Thu Jun 14 11:38:00 2018 -0700 MINOR: Use ListOffsets request instead of SimpleConsumer in LogOffsetTest (#5227) Included a few clean-ups related to unused variables in tests. Reviewers: Rajini Sivaram --- .../kafka/api/LegacyAdminClientTest.scala | 14 +- .../kafka/integration/KafkaServerTestHarness.scala | 4 +- core/src/test/scala/unit/kafka/log/LogTest.scala | 212 +++++++++---------- .../scala/unit/kafka/server/LogOffsetTest.scala | 224 +++++++++------------ 4 files changed, 210 insertions(+), 244 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala index b78946c..115ec05 100644 --- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala +++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala @@ -24,7 +24,7 @@ import java.lang.{Long => JLong} import kafka.utils.{Logging, TestUtils} import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.ApiKeys import org.junit.{After, Before, Test} @@ -147,16 +147,4 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging { }, "Expected non-empty assignment") } - private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], - numRecords: Int, - tp: TopicPartition) { - val futures = (0 until numRecords).map { i => - val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes) - debug(s"Sending this record: $record") - producer.send(record) - } - - futures.foreach(_.get) - } - } diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 662d6d2..2c4a988 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -31,6 +31,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer} import java.util.Properties import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.utils.Time /** * A test harness that brings up some number of broker nodes @@ -82,6 +83,7 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness { protected def trustStoreFile: Option[File] = None protected def serverSaslProperties: Option[Properties] = None protected def clientSaslProperties: Option[Properties] = None + protected def brokerTime(brokerId: Int): Time = Time.SYSTEM @Before override def setUp() { @@ -96,7 +98,7 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness { // Add each broker to `servers` buffer as soon as it is created to ensure that brokers // are shutdown cleanly in tearDown even if a subsequent broker fails to start for (config <- configs) - servers += TestUtils.createServer(config) + servers += TestUtils.createServer(config, time = brokerTime(config.brokerId)) brokerList = TestUtils.bootstrapServers(servers, listenerName) alive = new Array[Boolean](servers.length) Arrays.fill(alive, true) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 6c62e5e..550b929 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -98,7 +98,7 @@ class LogTest { val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L) // create a log - val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 24 * 60, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 24 * 60) assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) // Test the segment rolling behavior when messages do not have a timestamp. mockTime.sleep(log.config.segmentMs + 1) @@ -143,7 +143,7 @@ class LogTest { @Test(expected = classOf[OutOfOrderSequenceException]) def testNonSequentialAppend(): Unit = { // create a log - val log = createLog(logDir, LogConfig(), brokerTopicStats) + val log = createLog(logDir, LogConfig()) val pid = 1L val epoch: Short = 0 @@ -156,7 +156,7 @@ class LogTest { @Test def testTruncateToEmptySegment(): Unit = { - val log = createLog(logDir, LogConfig(), brokerTopicStats) + val log = createLog(logDir, LogConfig()) // Force a segment roll by using a large offset. The first segment will be empty val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), @@ -181,7 +181,7 @@ class LogTest { // simulate the upgrade path by creating a new log with several segments, deleting the // snapshot files, and then reloading the log val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10) - var log = createLog(logDir, logConfig, brokerTopicStats) + var log = createLog(logDir, logConfig) assertEquals(None, log.oldestProducerSnapshotOffset) for (i <- 0 to 100) { @@ -196,7 +196,7 @@ class LogTest { deleteProducerSnapshotFiles() // Reload after clean shutdown - log = createLog(logDir, logConfig, brokerTopicStats, recoveryPoint = logEndOffset) + log = createLog(logDir, logConfig, recoveryPoint = logEndOffset) var expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).takeRight(2).toVector :+ log.logEndOffset assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) log.close() @@ -205,7 +205,7 @@ class LogTest { deleteProducerSnapshotFiles() // Reload after unclean shutdown with recoveryPoint set to log end offset - log = createLog(logDir, logConfig, brokerTopicStats, recoveryPoint = logEndOffset) + log = createLog(logDir, logConfig, recoveryPoint = logEndOffset) // Note that we don't maintain the guarantee of having a snapshot for the 2 most recent segments in this case expectedSnapshotOffsets = Vector(log.logSegments.last.baseOffset, log.logEndOffset) assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) @@ -214,7 +214,7 @@ class LogTest { deleteProducerSnapshotFiles() // Reload after unclean shutdown with recoveryPoint set to 0 - log = createLog(logDir, logConfig, brokerTopicStats, recoveryPoint = 0L) + log = createLog(logDir, logConfig, recoveryPoint = 0L) // Is this working as intended? expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).tail.toVector :+ log.logEndOffset assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) @@ -224,7 +224,7 @@ class LogTest { @Test def testProducerSnapshotsRecoveryAfterUncleanShutdown(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10) - var log = createLog(logDir, logConfig, brokerTopicStats) + var log = createLog(logDir, logConfig) assertEquals(None, log.oldestProducerSnapshotOffset) for (i <- 0 to 100) { @@ -323,7 +323,7 @@ class LogTest { @Test def testProducerIdMapOffsetUpdatedForNonIdempotentData() { val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes))) log.appendAsLeader(records, leaderEpoch = 0) log.takeProducerSnapshot() @@ -514,7 +514,7 @@ class LogTest { @Test def testRebuildProducerIdMapWithCompactedData() { val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val pid = 1L val epoch = 0.toShort val seq = 0 @@ -557,7 +557,7 @@ class LogTest { @Test def testRebuildProducerStateWithEmptyCompactedBatch() { val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val pid = 1L val epoch = 0.toShort val seq = 0 @@ -598,7 +598,7 @@ class LogTest { @Test def testUpdateProducerIdMapWithCompactedData() { val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val pid = 1L val epoch = 0.toShort val seq = 0 @@ -631,7 +631,7 @@ class LogTest { @Test def testProducerIdMapTruncateTo() { val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes))), leaderEpoch = 0) log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes))), leaderEpoch = 0) log.takeProducerSnapshot() @@ -652,7 +652,7 @@ class LogTest { def testProducerIdMapTruncateToWithNoSnapshots() { // This ensures that the upgrade optimization path cannot be hit after initial loading val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val pid = 1L val epoch = 0.toShort @@ -676,7 +676,7 @@ class LogTest { @Test def testLoadProducersAfterDeleteRecordsMidSegment(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val pid1 = 1L val pid2 = 2L val epoch = 0.toShort @@ -696,7 +696,7 @@ class LogTest { log.close() - val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L, brokerTopicStats = brokerTopicStats) + val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L) assertEquals(1, reloadedLog.activeProducersWithLastSequence.size) val reloadedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2) assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt) @@ -705,7 +705,7 @@ class LogTest { @Test def testLoadProducersAfterDeleteRecordsOnSegment(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val pid1 = 1L val pid2 = 2L val epoch = 0.toShort @@ -731,7 +731,7 @@ class LogTest { log.close() - val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L, brokerTopicStats = brokerTopicStats) + val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L) assertEquals(1, reloadedLog.activeProducersWithLastSequence.size) val reloadedEntryOpt = log.activeProducersWithLastSequence.get(pid2) assertEquals(retainedLastSeqOpt, reloadedEntryOpt) @@ -741,7 +741,7 @@ class LogTest { def testProducerIdMapTruncateFullyAndStartAt() { val records = TestUtils.singletonRecords("foo".getBytes) val logConfig = LogTest.createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) log.appendAsLeader(records, leaderEpoch = 0) log.takeProducerSnapshot() @@ -764,7 +764,7 @@ class LogTest { val pid1 = 1L val records = TestUtils.records(Seq(new SimpleRecord("foo".getBytes)), producerId = pid1, producerEpoch = 0, sequence = 0) val logConfig = LogTest.createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) log.appendAsLeader(records, leaderEpoch = 0) log.takeProducerSnapshot() @@ -788,7 +788,7 @@ class LogTest { @Test def testTakeSnapshotOnRollAndDeleteSnapshotOnRecoveryPointCheckpoint() { val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0) log.roll(1L) assertEquals(Some(1L), log.latestProducerSnapshotOffset) @@ -821,7 +821,7 @@ class LogTest { def testProducerSnapshotAfterSegmentRollOnAppend(): Unit = { val producerId = 1L val logConfig = LogTest.createLogConfig(segmentBytes = 1024) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) log.appendAsLeader(TestUtils.records(Seq(new SimpleRecord(mockTime.milliseconds(), new Array[Byte](512))), producerId = producerId, producerEpoch = 0, sequence = 0), @@ -853,7 +853,7 @@ class LogTest { @Test def testRebuildTransactionalState(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val pid = 137L val epoch = 5.toShort @@ -874,7 +874,7 @@ class LogTest { log.close() - val reopenedLog = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val reopenedLog = createLog(logDir, logConfig) reopenedLog.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1) assertEquals(None, reopenedLog.firstUnstableOffset) } @@ -897,7 +897,7 @@ class LogTest { val pid = 23L val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = maxProducerIdExpirationMs, - producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs, brokerTopicStats = brokerTopicStats) + producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs) val records = Seq(new SimpleRecord(mockTime.milliseconds(), "foo".getBytes)) log.appendAsLeader(TestUtils.records(records, producerId = pid, producerEpoch = 0, sequence = 0), leaderEpoch = 0) @@ -913,7 +913,7 @@ class LogTest { @Test def testDuplicateAppends(): Unit = { // create a log - val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, LogConfig()) val pid = 1L val epoch: Short = 0 @@ -987,7 +987,7 @@ class LogTest { @Test def testMultipleProducerIdsPerMemoryRecord() : Unit = { // create a log - val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, LogConfig()) val epoch: Short = 0 val buffer = ByteBuffer.allocate(512) @@ -1033,7 +1033,7 @@ class LogTest { @Test def testDuplicateAppendToFollower() : Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val epoch: Short = 0 val pid = 1L val baseSequence = 0 @@ -1054,7 +1054,7 @@ class LogTest { @Test def testMultipleProducersWithDuplicatesInSingleAppend() : Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val pid1 = 1L val pid2 = 2L @@ -1106,7 +1106,7 @@ class LogTest { @Test(expected = classOf[ProducerFencedException]) def testOldProducerEpoch(): Unit = { // create a log - val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, LogConfig()) val pid = 1L val newEpoch: Short = 1 val oldEpoch: Short = 0 @@ -1128,7 +1128,7 @@ class LogTest { val maxJitter = 20 * 60L // create a log val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) log.appendAsLeader(set, leaderEpoch = 0) @@ -1153,7 +1153,7 @@ class LogTest { val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages // create a log val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) // segments expire in size @@ -1168,7 +1168,7 @@ class LogTest { @Test def testLoadEmptyLog() { createEmptyLogs(logDir, 0) - val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, LogConfig()) log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds), leaderEpoch = 0) } @@ -1178,7 +1178,7 @@ class LogTest { @Test def testAppendAndReadWithSequentialOffsets() { val logConfig = LogTest.createLogConfig(segmentBytes = 71) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray for(value <- values) @@ -1202,7 +1202,7 @@ class LogTest { @Test def testAppendAndReadWithNonSequentialOffsets() { val logConfig = LogTest.createLogConfig(segmentBytes = 72) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) @@ -1226,7 +1226,7 @@ class LogTest { @Test def testReadAtLogGap() { val logConfig = LogTest.createLogConfig(segmentBytes = 300) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) // keep appending until we have two segments with only a single message in the second segment while(log.numberOfSegments == 1) @@ -1242,7 +1242,7 @@ class LogTest { @Test(expected = classOf[KafkaStorageException]) def testLogRollAfterLogHandlerClosed() { val logConfig = LogTest.createLogConfig() - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) log.closeHandlers() log.roll(1) } @@ -1250,7 +1250,7 @@ class LogTest { @Test def testReadWithMinMessage() { val logConfig = LogTest.createLogConfig(segmentBytes = 72) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) @@ -1277,7 +1277,7 @@ class LogTest { @Test def testReadWithTooSmallMaxLength() { val logConfig = LogTest.createLogConfig(segmentBytes = 72) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) @@ -1311,7 +1311,7 @@ class LogTest { createEmptyLogs(logDir, 1024) // set up replica log starting with offset 1024 and with one message (at offset 1024) val logConfig = LogTest.createLogConfig(segmentBytes = 1024) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), leaderEpoch = 0) assertEquals("Reading at the log end offset should produce 0 byte read.", 0, @@ -1343,7 +1343,7 @@ class LogTest { def testLogRolls() { /* create a multipart log with 100 messages */ val logConfig = LogTest.createLogConfig(segmentBytes = 100) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val numMessages = 100 val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = mockTime.milliseconds)) @@ -1381,7 +1381,7 @@ class LogTest { def testCompressedMessages() { /* this log should roll after every messageset */ val logConfig = LogTest.createLogConfig(segmentBytes = 110) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0) @@ -1405,7 +1405,7 @@ class LogTest { logDir.mkdirs() // first test a log segment starting at 0 val logConfig = LogTest.createLogConfig(segmentBytes = 100, retentionMs = 0) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) for(i <- 0 until messagesToAppend) log.appendAsLeader(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = mockTime.milliseconds - 10), leaderEpoch = 0) @@ -1439,7 +1439,7 @@ class LogTest { // append messages to log val configSegmentSize = messageSet.sizeInBytes - 1 val logConfig = LogTest.createLogConfig(segmentBytes = configSegmentSize) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) try { log.appendAsLeader(messageSet, leaderEpoch = 0) @@ -1464,7 +1464,7 @@ class LogTest { val messageSetWithKeyedMessages = MemoryRecords.withRecords(CompressionType.NONE, keyedMessage, anotherKeyedMessage) val logConfig = LogTest.createLogConfig(cleanupPolicy = LogConfig.Compact) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) try { log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0) @@ -1505,7 +1505,7 @@ class LogTest { // append messages to log val maxMessageSize = second.sizeInBytes - 1 val logConfig = LogTest.createLogConfig(maxMessageBytes = maxMessageSize) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) // should be able to append the small message log.appendAsLeader(first, leaderEpoch = 0) @@ -1527,7 +1527,7 @@ class LogTest { val segmentSize = 7 * messageSize val indexInterval = 3 * messageSize val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = indexInterval, segmentIndexBytes = 4096) - var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + var log = createLog(logDir, logConfig) for(i <- 0 until numMessages) log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0) @@ -1554,12 +1554,12 @@ class LogTest { assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries) } - log = createLog(logDir, logConfig, recoveryPoint = lastOffset, brokerTopicStats = brokerTopicStats) + log = createLog(logDir, logConfig, recoveryPoint = lastOffset) verifyRecoveredLog(log, lastOffset) log.close() // test recovery case - log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + log = createLog(logDir, logConfig) verifyRecoveredLog(log, lastOffset) log.close() } @@ -1571,7 +1571,7 @@ class LogTest { def testBuildTimeIndexWhenNotAssigningOffsets() { val numMessages = 100 val logConfig = LogTest.createLogConfig(segmentBytes = 10000, indexIntervalBytes = 1) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val messages = (0 until numMessages).map { i => MemoryRecords.withRecords(100 + i, CompressionType.NONE, 0, new SimpleRecord(mockTime.milliseconds + i, i.toString.getBytes())) @@ -1591,7 +1591,7 @@ class LogTest { // publish the messages and close the log val numMessages = 200 val logConfig = LogTest.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) - var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + var log = createLog(logDir, logConfig) for(i <- 0 until numMessages) log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0) val indexFiles = log.logSegments.map(_.offsetIndex.file) @@ -1603,7 +1603,7 @@ class LogTest { timeIndexFiles.foreach(_.delete()) // reopen the log - log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + log = createLog(logDir, logConfig) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) assertTrue("The index should have been rebuilt", log.logSegments.head.offsetIndex.entries > 0) assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0) @@ -1625,7 +1625,7 @@ class LogTest { val numMessages = 200 val segmentSize = 200 val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = 1, messageFormatVersion = "0.9.0") - var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + var log = createLog(logDir, logConfig) for (i <- 0 until numMessages) log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = mockTime.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0) @@ -1636,7 +1636,7 @@ class LogTest { timeIndexFiles.foreach(file => Files.delete(file.toPath)) // The rebuilt time index should be empty - log = createLog(logDir, logConfig, recoveryPoint = numMessages + 1, brokerTopicStats = brokerTopicStats) + log = createLog(logDir, logConfig, recoveryPoint = numMessages + 1) for (segment <- log.logSegments.init) { assertEquals("The time index should be empty", 0, segment.timeIndex.entries) assertEquals("The time index file size should be 0", 0, segment.timeIndex.file.length) @@ -1651,7 +1651,7 @@ class LogTest { // publish the messages and close the log val numMessages = 200 val logConfig = LogTest.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) - var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + var log = createLog(logDir, logConfig) for(i <- 0 until numMessages) log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0) val indexFiles = log.logSegments.map(_.offsetIndex.file) @@ -1673,7 +1673,7 @@ class LogTest { } // reopen the log - log = createLog(logDir, logConfig, recoveryPoint = 200L, brokerTopicStats = brokerTopicStats) + log = createLog(logDir, logConfig, recoveryPoint = 200L) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) for(i <- 0 until numMessages) { assertEquals(i, log.readUncommitted(i, 100, None).records.batches.iterator.next().lastOffset) @@ -1697,7 +1697,7 @@ class LogTest { // create a log val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (_ <- 1 to msgPerSeg) @@ -1749,7 +1749,7 @@ class LogTest { val msgPerSeg = 10 val segmentSize = msgPerSeg * setSize // each segment will be 10 messages val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = setSize - 1) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (i<- 1 to msgPerSeg) @@ -1788,7 +1788,7 @@ class LogTest { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 1) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0) assertTrue("The first time index file should have been replaced with a larger file", bogusTimeIndex1.length > 0) @@ -1810,13 +1810,13 @@ class LogTest { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) // create a log val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000) - var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + var log = createLog(logDir, logConfig) // add enough messages to roll over several segments then close and re-open and attempt to truncate for (_ <- 0 until 100) log.appendAsLeader(createRecords, leaderEpoch = 0) log.close() - log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + log = createLog(logDir, logConfig) log.truncateTo(3) assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments) assertEquals("Log end offset should be 3.", 3, log.logEndOffset) @@ -1831,7 +1831,7 @@ class LogTest { val asyncDeleteMs = 1000 val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000, retentionMs = 999, fileDeleteDelayMs = asyncDeleteMs) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 100) @@ -1864,7 +1864,7 @@ class LogTest { def testOpenDeletesObsoleteFiles() { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) - var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + var log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 100) @@ -1874,13 +1874,13 @@ class LogTest { log.onHighWatermarkIncremented(log.logEndOffset) log.deleteOldSegments() log.close() - log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + log = createLog(logDir, logConfig) assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) } @Test def testAppendMessageWithNullPayload() { - val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, LogConfig()) log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0) val head = log.readUncommitted(0, 4096, None).records.records.iterator.next() assertEquals(0, head.offset) @@ -1889,7 +1889,7 @@ class LogTest { @Test def testAppendWithOutOfOrderOffsetsThrowsException() { - val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, LogConfig()) val appendOffsets = Seq(0L, 1L, 3L, 2L, 4L) val buffer = ByteBuffer.allocate(512) @@ -1910,7 +1910,7 @@ class LogTest { @Test def testAppendBelowExpectedOffsetThrowsException() { - val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, LogConfig()) val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray records.foreach(record => log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), leaderEpoch = 0)) @@ -1957,7 +1957,7 @@ class LogTest { @Test def testAppendWithNoTimestamp(): Unit = { - val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, LogConfig()) log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)), leaderEpoch = 0) } @@ -1971,7 +1971,7 @@ class LogTest { for (_ <- 0 until 10) { // create a log and write some messages to it logDir.mkdirs() - var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + var log = createLog(logDir, logConfig) val numMessages = 50 + TestUtils.random.nextInt(50) for (_ <- 0 until numMessages) log.appendAsLeader(createRecords, leaderEpoch = 0) @@ -2005,7 +2005,7 @@ class LogTest { def testOverCompactedLogRecovery(): Unit = { // append some messages to create some segments val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, 0, new SimpleRecord("v1".getBytes(), "k1".getBytes())) val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.NONE, 0, new SimpleRecord("v3".getBytes(), "k3".getBytes())) val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 3, CompressionType.NONE, 0, new SimpleRecord("v4".getBytes(), "k4".getBytes())) @@ -2038,7 +2038,7 @@ class LogTest { def testOverCompactedLogRecoveryMultiRecord(): Unit = { // append some messages to create some segments val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, 0, new SimpleRecord("v1".getBytes(), "k1".getBytes())) val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.GZIP, 0, new SimpleRecord("v3".getBytes(), "k3".getBytes()), @@ -2077,7 +2077,7 @@ class LogTest { def testOverCompactedLogRecoveryMultiRecordV1(): Unit = { // append some messages to create some segments val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val set1 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0, CompressionType.NONE, new SimpleRecord("v1".getBytes(), "k1".getBytes())) val set2 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, Integer.MAX_VALUE.toLong + 2, CompressionType.GZIP, @@ -2292,7 +2292,7 @@ class LogTest { assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists()) var recoveryPoint = 0L // create a log and write some messages to it - var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + var log = createLog(logDir, logConfig) for (_ <- 0 until 100) log.appendAsLeader(createRecords, leaderEpoch = 0) log.close() @@ -2300,7 +2300,7 @@ class LogTest { // check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the // clean shutdown file exists. recoveryPoint = log.logEndOffset - log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + log = createLog(logDir, logConfig) assertEquals(recoveryPoint, log.logEndOffset) Utils.delete(cleanShutdownFile) } @@ -2461,7 +2461,7 @@ class LogTest { def testDeleteOldSegments() { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 100) @@ -2511,7 +2511,7 @@ class LogTest { def testLogDeletionAfterClose() { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) // append some messages to create some segments log.appendAsLeader(createRecords, leaderEpoch = 0) @@ -2529,7 +2529,7 @@ class LogTest { def testLogDeletionAfterDeleteRecords() { def createRecords = TestUtils.singletonRecords("test".getBytes) val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) for (_ <- 0 until 15) log.appendAsLeader(createRecords, leaderEpoch = 0) @@ -2561,7 +2561,7 @@ class LogTest { def shouldDeleteSizeBasedSegments() { def createRecords = TestUtils.singletonRecords("test".getBytes) val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 15) @@ -2576,7 +2576,7 @@ class LogTest { def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() { def createRecords = TestUtils.singletonRecords("test".getBytes) val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 15) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 15) @@ -2591,7 +2591,7 @@ class LogTest { def shouldDeleteTimeBasedSegmentsReadyToBeDeleted() { def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = 10) val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 15) @@ -2606,7 +2606,7 @@ class LogTest { def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() { def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = mockTime.milliseconds) val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000000) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 15) @@ -2621,7 +2621,7 @@ class LogTest { def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() { def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L) val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact") - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 15) @@ -2640,7 +2640,7 @@ class LogTest { def shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() { def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L) val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact,delete") - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 15) @@ -2657,7 +2657,7 @@ class LogTest { //Given this partition is on leader epoch 72 val epoch = 72 - val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, LogConfig()) log.leaderEpochCache.assign(epoch, records.size) //When appending messages as a leader (i.e. assignOffsets = true) @@ -2689,7 +2689,7 @@ class LogTest { recs } - val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, LogConfig()) //When appending as follower (assignOffsets = false) for (i <- records.indices) @@ -2702,7 +2702,7 @@ class LogTest { def shouldTruncateLeaderEpochsWhenDeletingSegments() { def createRecords = TestUtils.singletonRecords("test".getBytes) val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val cache = epochCache(log) // Given three segments of 5 messages each @@ -2727,7 +2727,7 @@ class LogTest { def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() { def createRecords = TestUtils.singletonRecords("test".getBytes) val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val cache = epochCache(log) // Given three segments of 5 messages each @@ -2752,7 +2752,7 @@ class LogTest { def shouldTruncateLeaderEpochFileWhenTruncatingLog() { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) val logConfig = LogTest.createLogConfig(segmentBytes = 10 * createRecords.sizeInBytes) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val cache = epochCache(log) //Given 2 segments, 10 messages per segment @@ -2797,7 +2797,7 @@ class LogTest { */ @Test def testLogRecoversForLeaderEpoch() { - val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, LogConfig()) val leaderEpochCache = epochCache(log) val firstBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 1, offset = 0) log.appendAsFollower(records = firstBatch) @@ -2819,7 +2819,7 @@ class LogTest { log.close() // reopen the log and recover from the beginning - val recoveredLog = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + val recoveredLog = createLog(logDir, LogConfig()) val recoveredLeaderEpochCache = epochCache(recoveredLog) // epoch entries should be recovered @@ -2848,7 +2848,7 @@ class LogTest { def testFirstUnstableOffsetNoTransactionalData() { val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("foo".getBytes), @@ -2862,7 +2862,7 @@ class LogTest { @Test def testFirstUnstableOffsetWithTransactionalData() { val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val pid = 137L val epoch = 5.toShort @@ -2900,7 +2900,7 @@ class LogTest { @Test def testTransactionIndexUpdated(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val epoch = 0.toShort val pid1 = 1L @@ -2941,7 +2941,7 @@ class LogTest { @Test def testFullTransactionIndexRecovery(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val epoch = 0.toShort val pid1 = 1L @@ -2984,7 +2984,7 @@ class LogTest { log.close() val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5) - val reloadedLog = createLog(logDir, reloadedLogConfig, brokerTopicStats = brokerTopicStats) + val reloadedLog = createLog(logDir, reloadedLogConfig) val abortedTransactions = allAbortedTransactions(reloadedLog) assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) } @@ -2992,7 +2992,7 @@ class LogTest { @Test def testRecoverOnlyLastSegment(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val epoch = 0.toShort val pid1 = 1L @@ -3035,7 +3035,7 @@ class LogTest { log.close() val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5) - val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint, brokerTopicStats = brokerTopicStats) + val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint) val abortedTransactions = allAbortedTransactions(reloadedLog) assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) } @@ -3043,7 +3043,7 @@ class LogTest { @Test def testRecoverLastSegmentWithNoSnapshots(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val epoch = 0.toShort val pid1 = 1L @@ -3089,7 +3089,7 @@ class LogTest { log.close() val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5) - val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint, brokerTopicStats = brokerTopicStats) + val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint) val abortedTransactions = allAbortedTransactions(reloadedLog) assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) } @@ -3098,7 +3098,7 @@ class LogTest { def testTransactionIndexUpdatedThroughReplication(): Unit = { val epoch = 0.toShort val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val buffer = ByteBuffer.allocate(2048) val pid1 = 1L @@ -3144,7 +3144,7 @@ class LogTest { val pid = 1L val epoch = 0.toShort val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val append = appendTransactionalAsLeader(log, pid, epoch) @@ -3160,7 +3160,7 @@ class LogTest { @Test def testFirstUnstableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val epoch = 0.toShort val pid = 1L val appendPid = appendTransactionalAsLeader(log, pid, epoch) @@ -3184,7 +3184,7 @@ class LogTest { @Test def testFirstUnstableOffsetDoesNotExceedLogStartOffsetAfterSegmentDeletion(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val epoch = 0.toShort val pid = 1L val appendPid = appendTransactionalAsLeader(log, pid, epoch) @@ -3211,7 +3211,7 @@ class LogTest { @Test def testLastStableOffsetWithMixedProducerData() { val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) // for convenience, both producers share the same epoch val epoch = 5.toShort @@ -3272,7 +3272,7 @@ class LogTest { new SimpleRecord("c".getBytes)) val logConfig = LogTest.createLogConfig(segmentBytes = records.sizeInBytes) - val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats) + val log = createLog(logDir, logConfig) val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0) assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset)) @@ -3487,7 +3487,7 @@ object LogTest { createLogConfig(indexIntervalBytes = 1) var log = createLog(logDir, logConfig, brokerTopicStats, scheduler, time) - var inputRecords = ListBuffer[Record]() + val inputRecords = ListBuffer[Record]() // References to files we want to "merge" to emulate offset overflow val toMerge = ListBuffer[File]() diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index aa8236a..dd4f7e3 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -21,71 +21,56 @@ import java.io.File import java.util.concurrent.atomic.AtomicInteger import java.util.{Properties, Random} -import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo} -import kafka.common.TopicAndPartition -import kafka.consumer.SimpleConsumer import kafka.log.{Log, LogSegment} -import kafka.utils.TestUtils._ -import kafka.utils._ -import kafka.zk.ZooKeeperTestHarness +import kafka.network.SocketServer +import kafka.utils.{MockTime, TestUtils} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, IsolationLevel, ListOffsetRequest, ListOffsetResponse} import org.easymock.{EasyMock, IAnswer} import org.junit.Assert._ -import org.junit.{After, Before, Test} - -class LogOffsetTest extends ZooKeeperTestHarness { - val random = new Random() - var logDir: File = null - var topicLogDir: File = null - var server: KafkaServer = null - var logSize: Int = 140 - var simpleConsumer: SimpleConsumer = null - var time: Time = new MockTime() - - @Before - override def setUp() { - super.setUp() - val config: Properties = createBrokerConfig(1) - config.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString) - val logDirPath = config.getProperty("log.dir") - logDir = new File(logDirPath) - time = new MockTime() - server = TestUtils.createServer(KafkaConfig.fromProps(config), time) - simpleConsumer = new SimpleConsumer("localhost", TestUtils.boundPort(server), 1000000, 64*1024, "") - } +import org.junit.Test + +import scala.collection.JavaConverters._ + +class LogOffsetTest extends BaseRequestTest { + + private lazy val time = new MockTime + + protected override def numBrokers = 1 - @After - override def tearDown() { - simpleConsumer.close - TestUtils.shutdownServers(Seq(server)) - super.tearDown() + protected override def brokerTime(brokerId: Int) = time + + protected override def propertyOverrides(props: Properties): Unit = { + props.put("log.flush.interval.messages", "1") + props.put("num.partitions", "20") + props.put("log.retention.hours", "10") + props.put("log.retention.check.interval.ms", (5 * 1000 * 60).toString) + props.put("log.segment.bytes", "140") } @Test def testGetOffsetsForUnknownTopic() { - val topicAndPartition = TopicAndPartition("foo", 0) - val request = OffsetRequest( - Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10))) - val offsetResponse = simpleConsumer.getOffsetsBefore(request) - assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, - offsetResponse.partitionErrorAndOffsets(topicAndPartition).error) + val topicPartition = new TopicPartition("foo", 0) + val request = ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED) + .setOffsetData(Map(topicPartition -> + new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 10)).asJava).build(0) + val response = sendListOffsetsRequest(request) + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.responseData.get(topicPartition).error) } @Test def testGetOffsetsAfterDeleteRecords() { - val topicPartition = "kafka-" + 0 - val topic = topicPartition.split("-").head - val part = Integer.valueOf(topicPartition.split("-").last).intValue + val topic = "kafka-" + val topicPartition = new TopicPartition(topic, 0) - // setup brokers in ZooKeeper as owners of partitions for this test adminZkClient.createTopic(topic, 1, 1) val logManager = server.getLogManager - waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined, + TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined, "Log for partition [topic,0] should be created") - val log = logManager.getLog(new TopicPartition(topic, part)).get + val log = logManager.getLog(topicPartition).get for (_ <- 0 until 20) log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0) @@ -95,93 +80,87 @@ class LogOffsetTest extends ZooKeeperTestHarness { log.maybeIncrementLogStartOffset(3) log.deleteOldSegments() - val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 15) + val offsets = server.apis.fetchOffsets(logManager, topicPartition, ListOffsetRequest.LATEST_TIMESTAMP, 15) assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), offsets) - waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected") - val topicAndPartition = TopicAndPartition(topic, part) - val offsetRequest = OffsetRequest( - Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 15)), - replicaId = 0) - val consumerOffsets = - simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets + TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server), + "Leader should be elected") + val request = ListOffsetRequest.Builder.forReplica(0, 0) + .setOffsetData(Map(topicPartition -> + new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 15)).asJava).build() + val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets) } @Test def testGetOffsetsBeforeLatestTime() { - val topicPartition = "kafka-" + 0 - val topic = topicPartition.split("-").head - val part = Integer.valueOf(topicPartition.split("-").last).intValue + val topic = "kafka-" + val topicPartition = new TopicPartition(topic, 0) - // setup brokers in ZooKeeper as owners of partitions for this test adminZkClient.createTopic(topic, 1, 1) val logManager = server.getLogManager - waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined, - "Log for partition [topic,0] should be created") - val log = logManager.getLog(new TopicPartition(topic, part)).get + TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined, + s"Log for partition $topicPartition should be created") + val log = logManager.getLog(topicPartition).get for (_ <- 0 until 20) log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0) log.flush() - val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 15) + val offsets = server.apis.fetchOffsets(logManager, topicPartition, ListOffsetRequest.LATEST_TIMESTAMP, 15) assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets) - waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected") - val topicAndPartition = TopicAndPartition(topic, part) - val offsetRequest = OffsetRequest( - Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 15)), - replicaId = 0) - val consumerOffsets = - simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets + TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server), + "Leader should be elected") + val request = ListOffsetRequest.Builder.forReplica(0, 0) + .setOffsetData(Map(topicPartition -> + new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 15)).asJava).build() + val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), consumerOffsets) // try to fetch using latest offset - val fetchResponse = simpleConsumer.fetch( - new FetchRequestBuilder().addFetch(topic, 0, consumerOffsets.head, 300 * 1024).build()) - assertFalse(fetchResponse.messageSet(topic, 0).iterator.hasNext) + val fetchRequest = FetchRequest.Builder.forConsumer(0, 1, + Map(topicPartition -> new FetchRequest.PartitionData(consumerOffsets.head, FetchRequest.INVALID_LOG_START_OFFSET, + 300 * 1024)).asJava).build() + val fetchResponse = sendFetchRequest(fetchRequest) + assertFalse(fetchResponse.responseData.get(topicPartition).records.batches.iterator.hasNext) } @Test def testEmptyLogsGetOffsets() { - val topicPartition = "kafka-" + random.nextInt(10) - val topicPartitionPath = TestUtils.tempDir().getAbsolutePath + "/" + topicPartition - topicLogDir = new File(topicPartitionPath) + val random = new Random + val topic = "kafka-" + val topicPartition = new TopicPartition(topic, random.nextInt(10)) + val topicPartitionPath = s"${TestUtils.tempDir().getAbsolutePath}/$topic-${topicPartition.partition}" + val topicLogDir = new File(topicPartitionPath) topicLogDir.mkdir() - val topic = topicPartition.split("-").head - - // setup brokers in ZooKeeper as owners of partitions for this test - createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server)) + createTopic(topic, numPartitions = 1, replicationFactor = 1) var offsetChanged = false for (_ <- 1 to 14) { - val topicAndPartition = TopicAndPartition(topic, 0) - val offsetRequest = - OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))) - val consumerOffsets = - simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets - - if(consumerOffsets.head == 1) { + val topicPartition = new TopicPartition(topic, 0) + val request = ListOffsetRequest.Builder.forReplica(0, 0) + .setOffsetData(Map(topicPartition -> + new ListOffsetRequest.PartitionData(ListOffsetRequest.EARLIEST_TIMESTAMP, 1)).asJava).build() + val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala + if (consumerOffsets.head == 1) offsetChanged = true - } } assertFalse(offsetChanged) } @Test def testGetOffsetsBeforeNow() { - val topicPartition = "kafka-" + random.nextInt(3) - val topic = topicPartition.split("-").head - val part = Integer.valueOf(topicPartition.split("-").last).intValue + val random = new Random + val topic = "kafka-" + val topicPartition = new TopicPartition(topic, random.nextInt(3)) - // setup brokers in ZooKeeper as owners of partitions for this test adminZkClient.createTopic(topic, 3, 1) val logManager = server.getLogManager - val log = logManager.getOrCreateLog(new TopicPartition(topic, part), logManager.initialDefaultConfig) + val log = logManager.getOrCreateLog(topicPartition, logManager.initialDefaultConfig) for (_ <- 0 until 20) log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0) @@ -189,42 +168,42 @@ class LogOffsetTest extends ZooKeeperTestHarness { val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs - val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), now, 15) + val offsets = server.apis.fetchOffsets(logManager, topicPartition, now, 15) assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets) - waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected") - val topicAndPartition = TopicAndPartition(topic, part) - val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 15)), replicaId = 0) - val consumerOffsets = - simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets + TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server), + "Leader should be elected") + val request = ListOffsetRequest.Builder.forReplica(0, 0) + .setOffsetData(Map(topicPartition -> + new ListOffsetRequest.PartitionData(now, 15)).asJava).build() + val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), consumerOffsets) } @Test def testGetOffsetsBeforeEarliestTime() { - val topicPartition = "kafka-" + random.nextInt(3) - val topic = topicPartition.split("-").head - val part = Integer.valueOf(topicPartition.split("-").last).intValue + val random = new Random + val topic = "kafka-" + val topicPartition = new TopicPartition(topic, random.nextInt(3)) - // setup brokers in ZooKeeper as owners of partitions for this test adminZkClient.createTopic(topic, 3, 1) val logManager = server.getLogManager - val log = logManager.getOrCreateLog(new TopicPartition(topic, part), logManager.initialDefaultConfig) + val log = logManager.getOrCreateLog(topicPartition, logManager.initialDefaultConfig) for (_ <- 0 until 20) log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0) log.flush() - val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.EarliestTime, 10) + val offsets = server.apis.fetchOffsets(logManager, topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP, 10) assertEquals(Seq(0L), offsets) - waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected") - val topicAndPartition = TopicAndPartition(topic, part) - val offsetRequest = - OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 10))) - val consumerOffsets = - simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets + TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server), + "Leader should be elected") + val request = ListOffsetRequest.Builder.forReplica(0, 0) + .setOffsetData(Map(topicPartition -> + new ListOffsetRequest.PartitionData(ListOffsetRequest.EARLIEST_TIMESTAMP, 10)).asJava).build() + val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala assertEquals(Seq(0L), consumerOffsets) } @@ -264,19 +243,16 @@ class LogOffsetTest extends ZooKeeperTestHarness { server.apis.fetchOffsetsBefore(log, System.currentTimeMillis, 100) } - private def createBrokerConfig(nodeId: Int): Properties = { - val props = new Properties - props.put("broker.id", nodeId.toString) - props.put("port", TestUtils.RandomPort.toString()) - props.put("log.dir", TestUtils.tempDir().getAbsolutePath) - props.put("log.flush.interval.messages", "1") - props.put("enable.zookeeper", "false") - props.put("num.partitions", "20") - props.put("log.retention.hours", "10") - props.put("log.retention.check.interval.ms", (5*1000*60).toString) - props.put("log.segment.bytes", logSize.toString) - props.put("zookeeper.connect", zkConnect.toString) - props + private def server: KafkaServer = servers.head + + private def sendListOffsetsRequest(request: ListOffsetRequest, destination: Option[SocketServer] = None): ListOffsetResponse = { + val response = connectAndSend(request, ApiKeys.LIST_OFFSETS, destination = destination.getOrElse(anySocketServer)) + ListOffsetResponse.parse(response, request.version) + } + + private def sendFetchRequest(request: FetchRequest, destination: Option[SocketServer] = None): FetchResponse[MemoryRecords] = { + val response = connectAndSend(request, ApiKeys.FETCH, destination = destination.getOrElse(anySocketServer)) + FetchResponse.parse(response, request.version) } } -- To stop receiving notification emails like this one, please contact ijuma@apache.org.