Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 02E77200BE7 for ; Tue, 20 Dec 2016 20:17:49 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 016A6160B29; Tue, 20 Dec 2016 19:17:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EE2FB160B12 for ; Tue, 20 Dec 2016 20:17:47 +0100 (CET) Received: (qmail 87459 invoked by uid 500); 20 Dec 2016 19:17:47 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 87450 invoked by uid 99); 20 Dec 2016 19:17:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Dec 2016 19:17:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E7629DFB86; Tue, 20 Dec 2016 19:17:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jgus@apache.org To: commits@kafka.apache.org Message-Id: <59d6d29dbcda4570b73cef67e763e57e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-4554; Fix ReplicaBuffer.verifyChecksum to use iterators instead of iterables Date: Tue, 20 Dec 2016 19:17:46 +0000 (UTC) archived-at: Tue, 20 Dec 2016 19:17:49 -0000 Repository: kafka Updated Branches: refs/heads/trunk 7f4b278c0 -> 3930dd7e7 KAFKA-4554; Fix ReplicaBuffer.verifyChecksum to use iterators instead of iterables This was changed in b58b6a1bef0 and caused the `ReplicaVerificationToolTest.test_replica_lags` system test to start failing. I also added a unit test and a couple of other minor clean-ups. Author: Ismael Juma Reviewers: Jason Gustafson Closes #2280 from ijuma/kafka-4554-fix-replica-buffer-verify-checksum Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3930dd7e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3930dd7e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3930dd7e Branch: refs/heads/trunk Commit: 3930dd7e7502ab94a7594a0ca41f27638663ae7c Parents: 7f4b278 Author: Ismael Juma Authored: Tue Dec 20 11:01:46 2016 -0800 Committer: Jason Gustafson Committed: Tue Dec 20 11:01:46 2016 -0800 ---------------------------------------------------------------------- .../java/org/apache/kafka/test/TestUtils.java | 24 -------- .../kafka/tools/ReplicaVerificationTool.scala | 21 +++---- .../tools/ReplicaVerificationToolTest.scala | 60 ++++++++++++++++++++ .../unit/kafka/server/ProduceRequestTest.scala | 19 +++---- .../apache/kafka/streams/KafkaStreamsTest.java | 2 +- 5 files changed, 78 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3930dd7e/clients/src/test/java/org/apache/kafka/test/TestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 428b5a0..c39f402 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -21,12 +21,6 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.MemoryRecordsBuilder; -import org.apache.kafka.common.record.Record; -import org.apache.kafka.common.record.Records; -import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Utils; import javax.xml.bind.DatatypeConverter; @@ -180,24 +174,6 @@ public class TestUtils { return file; } - /** - * Create a records buffer including the offset and message size at the start, which is required if the buffer is to - * be sent as part of `ProduceRequest`. This is the reason why we can't use - * `Record(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize)` as this - * constructor does not include either of these fields. - */ - public static ByteBuffer partitionRecordsBuffer(final long offset, final CompressionType compressionType, final Record... records) { - int bufferSize = 0; - for (final Record record : records) - bufferSize += Records.LOG_OVERHEAD + record.sizeInBytes(); - final ByteBuffer buffer = ByteBuffer.allocate(bufferSize); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, compressionType, TimestampType.CREATE_TIME); - long nextOffset = offset; - for (final Record record : records) - builder.append(nextOffset++, record); - return builder.build().buffer(); - } - public static Properties producerConfig(final String bootstrapServers, final Class keySerializer, final Class valueSerializer, http://git-wip-us.apache.org/repos/asf/kafka/blob/3930dd7e/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 98e0414..f6d5153 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -252,9 +252,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa val offsetRequest = OffsetRequest(initialOffsetMap) val offsetResponse = consumer.getOffsetsBefore(offsetRequest) assert(!offsetResponse.hasError, offsetResponseStringWithError(offsetResponse)) - offsetResponse.partitionErrorAndOffsets.foreach{ - case (topicAndPartition, partitionOffsetResponse) => - fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head) + offsetResponse.partitionErrorAndOffsets.foreach { case (topicAndPartition, partitionOffsetResponse) => + fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head) } } } @@ -267,7 +266,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa fetchOffsetMap.get(topicAndPartition) } - def verifyCheckSum() { + def verifyCheckSum(println: String => Unit) { debug("Begin verification") maxLag = -1L for ((topicAndPartition, fetchResponsePerReplica) <- messageSetCache) { @@ -275,8 +274,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition), "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected " + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas") - val logEntriesMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) => - replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.shallowEntries.asScala + val logEntryIteratorMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) => + replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.shallowEntries.iterator } val maxHw = fetchResponsePerReplica.values.map(_.hw).max @@ -284,9 +283,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa var isMessageInAllReplicas = true while (isMessageInAllReplicas) { var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None - for ((replicaId, logEntries) <- logEntriesMap) { + for ((replicaId, logEntriesIterator) <- logEntryIteratorMap) { try { - val logEntriesIterator = logEntries.iterator if (logEntriesIterator.hasNext) { val logEntry = logEntriesIterator.next() @@ -378,9 +376,8 @@ private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAn } if (response != null) { - response.data.foreach { - case(topicAndPartition, partitionData) => - replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, partitionData) + response.data.foreach { case (topicAndPartition, partitionData) => + replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, partitionData) } } else { for (topicAndPartition <- topicAndPartitions) @@ -397,7 +394,7 @@ private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAn // one of the fetchers will do the verification if (doVerification) { debug("Do verification") - replicaBuffer.verifyCheckSum() + replicaBuffer.verifyCheckSum(println) replicaBuffer.createNewFetcherBarrier() replicaBuffer.createNewVerificationBarrier() debug("Created new barrier") http://git-wip-us.apache.org/repos/asf/kafka/blob/3930dd7e/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala new file mode 100644 index 0000000..ffa3474 --- /dev/null +++ b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import kafka.api.FetchResponsePartitionData +import kafka.common.TopicAndPartition +import kafka.message.ByteBufferMessageSet +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.{MemoryRecords, Record} +import org.junit.Test +import org.junit.Assert.assertTrue + +class ReplicaVerificationToolTest { + + @Test + def testReplicaBufferVerifyChecksum(): Unit = { + val sb = new StringBuilder + + val expectedReplicasPerTopicAndPartition = Map( + TopicAndPartition("a", 0) -> 3, + TopicAndPartition("a", 1) -> 3, + TopicAndPartition("b", 0) -> 2 + ) + + val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, Map.empty, 2, Map.empty, 0, 0) + expectedReplicasPerTopicAndPartition.foreach { case (tp, numReplicas) => + (0 until numReplicas).foreach { replicaId => + val records = (0 to 5).map { index => + Record.create(s"key $index".getBytes, s"value $index".getBytes) + } + val initialOffset = 4 + val memoryRecords = MemoryRecords.withRecords(initialOffset, records: _*) + replicaBuffer.addFetchedData(tp, replicaId, new FetchResponsePartitionData(Errors.NONE.code(), hw = 20, + new ByteBufferMessageSet(memoryRecords.buffer))) + } + } + + replicaBuffer.verifyCheckSum(line => sb.append(s"$line\n")) + val output = sb.toString.trim + + assertTrue(s"Max lag information should be in output: `$output`", + output.endsWith(": max lag is 10 for partition [a,0] at offset 10 among 3 partitions")) + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3930dd7e/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index 51be54c..4dfe4b5 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -17,12 +17,9 @@ package kafka.server -import java.nio.ByteBuffer - import kafka.utils.TestUtils -import org.apache.kafka.test.{TestUtils => JTestUtils} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils} +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record} import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse} import org.junit.Assert._ @@ -40,9 +37,9 @@ class ProduceRequestTest extends BaseRequestTest { def testSimpleProduceRequest() { val (partition, leader) = createTopicAndFindPartitionWithLeader("topic") - def sendAndCheck(recordBuffer: ByteBuffer, expectedOffset: Long): ProduceResponse.PartitionResponse = { + def sendAndCheck(memoryRecords: MemoryRecords, expectedOffset: Long): ProduceResponse.PartitionResponse = { val topicPartition = new TopicPartition("topic", partition) - val partitionRecords = Map(topicPartition -> MemoryRecords.readableRecords(recordBuffer)) + val partitionRecords = Map(topicPartition -> memoryRecords) val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava)) assertEquals(1, produceResponse.responses.size) val (tp, partitionResponse) = produceResponse.responses.asScala.head @@ -53,10 +50,10 @@ class ProduceRequestTest extends BaseRequestTest { partitionResponse } - sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.NONE, + sendAndCheck(MemoryRecords.withRecords( Record.create(System.currentTimeMillis(), "key".getBytes, "value".getBytes)), 0) - sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.GZIP, + sendAndCheck(MemoryRecords.withRecords(CompressionType.GZIP, Record.create(System.currentTimeMillis(), "key1".getBytes, "value1".getBytes), Record.create(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1) } @@ -73,12 +70,12 @@ class ProduceRequestTest extends BaseRequestTest { def testCorruptLz4ProduceRequest() { val (partition, leader) = createTopicAndFindPartitionWithLeader("topic") val timestamp = 1000000 - val recordBuffer = JTestUtils.partitionRecordsBuffer(0, CompressionType.LZ4, + val memoryRecords = MemoryRecords.withRecords(CompressionType.LZ4, Record.create(timestamp, "key".getBytes, "value".getBytes)) // Change the lz4 checksum value so that it doesn't match the contents - recordBuffer.array.update(40, 0) + memoryRecords.buffer.array.update(40, 0) val topicPartition = new TopicPartition("topic", partition) - val partitionRecords = Map(topicPartition -> MemoryRecords.readableRecords(recordBuffer)) + val partitionRecords = Map(topicPartition -> memoryRecords) val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava)) assertEquals(1, produceResponse.responses.size) val (tp, partitionResponse) = produceResponse.responses.asScala.head http://git-wip-us.apache.org/repos/asf/kafka/blob/3930dd7e/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 37809bf..5804407 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -271,7 +271,7 @@ public class KafkaStreamsTest { @Override public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) { - Long prevCount = this.mapStates.containsKey(newState) ? this.mapStates.get(newState) : 0; + long prevCount = this.mapStates.containsKey(newState) ? this.mapStates.get(newState) : 0; this.numChanges++; this.oldState = oldState; this.newState = newState;