Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 871F2200B58 for ; Wed, 27 Jul 2016 16:51:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 85CDA160A90; Wed, 27 Jul 2016 14:51:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A7BB1160A6F for ; Wed, 27 Jul 2016 16:51:47 +0200 (CEST) Received: (qmail 37603 invoked by uid 500); 27 Jul 2016 14:51:46 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 37594 invoked by uid 99); 27 Jul 2016 14:51:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Jul 2016 14:51:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A548CE02A2; Wed, 27 Jul 2016 14:51:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: junrao@apache.org To: commits@kafka.apache.org Message-Id: <4f1ba0ea5c144f329c629181b19b83c8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-3996; ByteBufferMessageSet.writeTo() should be non-blocking Date: Wed, 27 Jul 2016 14:51:46 +0000 (UTC) archived-at: Wed, 27 Jul 2016 14:51:48 -0000 Repository: kafka Updated Branches: refs/heads/trunk 4059f0721 -> 64842f47f KAFKA-3996; ByteBufferMessageSet.writeTo() should be non-blocking Also: * Introduce a blocking variant to be used by `FileMessageSet.append` * Add tests * Minor clean-ups Author: Ismael Juma Reviewers: Jun Rao Closes #1669 from ijuma/kafka-3996-byte-buffer-message-set-write-to-non-blocking Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/64842f47 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/64842f47 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/64842f47 Branch: refs/heads/trunk Commit: 64842f47f8ad21c942be2e2dd1edb56e3bc76cb8 Parents: 4059f07 Author: Ismael Juma Authored: Wed Jul 27 07:51:40 2016 -0700 Committer: Jun Rao Committed: Wed Jul 27 07:51:40 2016 -0700 ---------------------------------------------------------------------- .../kafka/common/network/ByteBufferSend.java | 2 +- .../main/scala/kafka/log/FileMessageSet.scala | 2 +- .../kafka/message/ByteBufferMessageSet.scala | 18 ++++- .../kafka/message/BaseMessageSetTestCases.scala | 80 +++++++++++++++++--- .../message/ByteBufferMessageSetTest.scala | 10 +++ 5 files changed, 96 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/64842f47/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java index d7357b2..9e213ec 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java @@ -23,9 +23,9 @@ import java.nio.channels.GatheringByteChannel; public class ByteBufferSend implements Send { private final String destination; + private final int size; protected final ByteBuffer[] buffers; private int remaining; - private int size; private boolean pending = false; public ByteBufferSend(String destination, ByteBuffer... buffers) { http://git-wip-us.apache.org/repos/asf/kafka/blob/64842f47/core/src/main/scala/kafka/log/FileMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index 165c322..8e92f95 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -294,7 +294,7 @@ class FileMessageSet private[kafka](@volatile var file: File, * Append these messages to the message set */ def append(messages: ByteBufferMessageSet) { - val written = messages.writeTo(channel, 0, messages.sizeInBytes) + val written = messages.writeFullyTo(channel) _size.getAndAdd(written) } http://git-wip-us.apache.org/repos/asf/kafka/blob/64842f47/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 98f6131..15d4eea 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -294,16 +294,28 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi } /** Write the messages in this set to the given channel */ - def writeTo(channel: GatheringByteChannel, offset: Long, size: Int): Int = { - // Ignore offset and size from input. We just want to write the whole buffer to the channel. + def writeFullyTo(channel: GatheringByteChannel): Int = { buffer.mark() var written = 0 - while(written < sizeInBytes) + while (written < sizeInBytes) written += channel.write(buffer) buffer.reset() written } + /** Write the messages in this set to the given channel starting at the given offset byte. + * Less than the complete amount may be written, but no more than maxSize can be. The number + * of bytes written is returned */ + def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int = { + if (offset > Int.MaxValue) + throw new IllegalArgumentException(s"offset should not be larger than Int.MaxValue: $offset") + val dup = buffer.duplicate() + val position = offset.toInt + dup.position(position) + dup.limit(math.min(buffer.limit, position + maxSize)) + channel.write(dup) + } + override def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = { for (messageAndOffset <- shallowIterator) { if (messageAndOffset.message.magic != expectedMagicValue) http://git-wip-us.apache.org/repos/asf/kafka/blob/64842f47/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala index 10687d1..0d86128 100644 --- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala +++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala @@ -17,15 +17,45 @@ package kafka.message -import java.io.RandomAccessFile +import java.nio.ByteBuffer +import java.nio.channels.{FileChannel, GatheringByteChannel} +import java.nio.file.StandardOpenOption + import org.junit.Assert._ import kafka.utils.TestUtils._ import kafka.log.FileMessageSet +import kafka.utils.TestUtils import org.scalatest.junit.JUnitSuite import org.junit.Test +import scala.collection.mutable.ArrayBuffer + trait BaseMessageSetTestCases extends JUnitSuite { - + + private class StubByteChannel(bytesToConsumePerBuffer: Int) extends GatheringByteChannel { + + val data = new ArrayBuffer[Byte] + + def write(srcs: Array[ByteBuffer], offset: Int, length: Int): Long = { + srcs.map { src => + val array = new Array[Byte](math.min(bytesToConsumePerBuffer, src.remaining)) + src.get(array) + data ++= array + array.length + }.sum + } + + def write(srcs: Array[ByteBuffer]): Long = write(srcs, 0, srcs.map(_.remaining).sum) + + def write(src: ByteBuffer): Int = write(Array(src)).toInt + + def isOpen: Boolean = true + + def close() {} + + } + + val messages = Array(new Message("abcd".getBytes), new Message("efgh".getBytes), new Message("ijkl".getBytes)) def createMessageSet(messages: Seq[Message]): MessageSet @@ -56,20 +86,48 @@ trait BaseMessageSetTestCases extends JUnitSuite { @Test def testWriteTo() { // test empty message set - testWriteToWithMessageSet(createMessageSet(Array[Message]())) - testWriteToWithMessageSet(createMessageSet(messages)) + checkWriteToWithMessageSet(createMessageSet(Array[Message]())) + checkWriteToWithMessageSet(createMessageSet(messages)) } - def testWriteToWithMessageSet(set: MessageSet) { + /* Tests that writing to a channel that doesn't consume all the bytes in the buffer works correctly */ + @Test + def testWriteToChannelThatConsumesPartially() { + val bytesToConsumePerBuffer = 50 + val messages = (0 until 10).map(_ => new Message(TestUtils.randomString(100).getBytes)) + val messageSet = createMessageSet(messages) + val messageSetSize = messageSet.sizeInBytes + + val channel = new StubByteChannel(bytesToConsumePerBuffer) + + var remaining = messageSetSize + var iterations = 0 + while (remaining > 0) { + remaining -= messageSet.writeTo(channel, messageSetSize - remaining, remaining) + iterations += 1 + } + + assertEquals((messageSetSize / bytesToConsumePerBuffer) + 1, iterations) + checkEquals(new ByteBufferMessageSet(ByteBuffer.wrap(channel.data.toArray)).iterator, messageSet.iterator) + } + + def checkWriteToWithMessageSet(messageSet: MessageSet) { + checkWriteWithMessageSet(messageSet, messageSet.writeTo(_, 0, messageSet.sizeInBytes)) + } + + def checkWriteWithMessageSet(set: MessageSet, write: GatheringByteChannel => Long) { // do the write twice to ensure the message set is restored to its original state - for(i <- List(0,1)) { + for (_ <- 0 to 1) { val file = tempFile() - val channel = new RandomAccessFile(file, "rw").getChannel() - val written = set.writeTo(channel, 0, 1024) - assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written) - val newSet = new FileMessageSet(file, channel) - checkEquals(set.iterator, newSet.iterator) + val channel = FileChannel.open(file.toPath, StandardOpenOption.READ, StandardOpenOption.WRITE) + try { + val written = write(channel) + assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written) + val newSet = new FileMessageSet(file, channel) + checkEquals(set.iterator, newSet.iterator) + } finally channel.close() } } } + http://git-wip-us.apache.org/repos/asf/kafka/blob/64842f47/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala index 8f66d62..4810009 100644 --- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala @@ -380,6 +380,16 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { messageTimestampType = TimestampType.CREATE_TIME, messageTimestampDiffMaxMs = 5000L)._1, offset) } + + @Test + def testWriteFullyTo() { + checkWriteFullyToWithMessageSet(createMessageSet(Array[Message]())) + checkWriteFullyToWithMessageSet(createMessageSet(messages)) + } + + def checkWriteFullyToWithMessageSet(messageSet: ByteBufferMessageSet) { + checkWriteWithMessageSet(messageSet, messageSet.writeFullyTo) + } /* check that offsets are assigned based on byte offset from the given base offset */ def checkOffsets(messages: ByteBufferMessageSet, baseOffset: Long) {