kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-3996; ByteBufferMessageSet.writeTo() should be non-blocking
Date Wed, 27 Jul 2016 14:51:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4059f0721 -> 64842f47f


KAFKA-3996; ByteBufferMessageSet.writeTo() should be non-blocking

Also:
* Introduce a blocking variant to be used by `FileMessageSet.append`
* Add tests
* Minor clean-ups

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #1669 from ijuma/kafka-3996-byte-buffer-message-set-write-to-non-blocking


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/64842f47
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/64842f47
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/64842f47

Branch: refs/heads/trunk
Commit: 64842f47f8ad21c942be2e2dd1edb56e3bc76cb8
Parents: 4059f07
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Wed Jul 27 07:51:40 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Jul 27 07:51:40 2016 -0700

----------------------------------------------------------------------
 .../kafka/common/network/ByteBufferSend.java    |  2 +-
 .../main/scala/kafka/log/FileMessageSet.scala   |  2 +-
 .../kafka/message/ByteBufferMessageSet.scala    | 18 ++++-
 .../kafka/message/BaseMessageSetTestCases.scala | 80 +++++++++++++++++---
 .../message/ByteBufferMessageSetTest.scala      | 10 +++
 5 files changed, 96 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/64842f47/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
index d7357b2..9e213ec 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
@@ -23,9 +23,9 @@ import java.nio.channels.GatheringByteChannel;
 public class ByteBufferSend implements Send {
 
     private final String destination;
+    private final int size;
     protected final ByteBuffer[] buffers;
     private int remaining;
-    private int size;
     private boolean pending = false;
 
     public ByteBufferSend(String destination, ByteBuffer... buffers) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/64842f47/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 165c322..8e92f95 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -294,7 +294,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
    * Append these messages to the message set
    */
   def append(messages: ByteBufferMessageSet) {
-    val written = messages.writeTo(channel, 0, messages.sizeInBytes)
+    val written = messages.writeFullyTo(channel)
     _size.getAndAdd(written)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/64842f47/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 98f6131..15d4eea 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -294,16 +294,28 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet
with Loggi
   }
 
   /** Write the messages in this set to the given channel */
-  def writeTo(channel: GatheringByteChannel, offset: Long, size: Int): Int = {
-    // Ignore offset and size from input. We just want to write the whole buffer to the channel.
+  def writeFullyTo(channel: GatheringByteChannel): Int = {
     buffer.mark()
     var written = 0
-    while(written < sizeInBytes)
+    while (written < sizeInBytes)
       written += channel.write(buffer)
     buffer.reset()
     written
   }
 
+  /** Write the messages in this set to the given channel starting at the given offset byte.
+    * Less than the complete amount may be written, but no more than maxSize can be. The
number
+    * of bytes written is returned */
+  def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int = {
+    if (offset > Int.MaxValue)
+      throw new IllegalArgumentException(s"offset should not be larger than Int.MaxValue:
$offset")
+    val dup = buffer.duplicate()
+    val position = offset.toInt
+    dup.position(position)
+    dup.limit(math.min(buffer.limit, position + maxSize))
+    channel.write(dup)
+  }
+
   override def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = {
     for (messageAndOffset <- shallowIterator) {
       if (messageAndOffset.message.magic != expectedMagicValue)

http://git-wip-us.apache.org/repos/asf/kafka/blob/64842f47/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
index 10687d1..0d86128 100644
--- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
@@ -17,15 +17,45 @@
 
 package kafka.message
 
-import java.io.RandomAccessFile
+import java.nio.ByteBuffer
+import java.nio.channels.{FileChannel, GatheringByteChannel}
+import java.nio.file.StandardOpenOption
+
 import org.junit.Assert._
 import kafka.utils.TestUtils._
 import kafka.log.FileMessageSet
+import kafka.utils.TestUtils
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
 
+import scala.collection.mutable.ArrayBuffer
+
 trait BaseMessageSetTestCases extends JUnitSuite {
-  
+
+  private class StubByteChannel(bytesToConsumePerBuffer: Int) extends GatheringByteChannel
{
+
+    val data = new ArrayBuffer[Byte]
+
+    def write(srcs: Array[ByteBuffer], offset: Int, length: Int): Long = {
+      srcs.map { src =>
+        val array = new Array[Byte](math.min(bytesToConsumePerBuffer, src.remaining))
+        src.get(array)
+        data ++= array
+        array.length
+      }.sum
+    }
+
+    def write(srcs: Array[ByteBuffer]): Long = write(srcs, 0, srcs.map(_.remaining).sum)
+
+    def write(src: ByteBuffer): Int = write(Array(src)).toInt
+
+    def isOpen: Boolean = true
+
+    def close() {}
+
+  }
+
+
   val messages = Array(new Message("abcd".getBytes), new Message("efgh".getBytes), new Message("ijkl".getBytes))
   
   def createMessageSet(messages: Seq[Message]): MessageSet
@@ -56,20 +86,48 @@ trait BaseMessageSetTestCases extends JUnitSuite {
   @Test
   def testWriteTo() {
     // test empty message set
-    testWriteToWithMessageSet(createMessageSet(Array[Message]()))
-    testWriteToWithMessageSet(createMessageSet(messages))
+    checkWriteToWithMessageSet(createMessageSet(Array[Message]()))
+    checkWriteToWithMessageSet(createMessageSet(messages))
   }
 
-  def testWriteToWithMessageSet(set: MessageSet) {
+  /* Tests that writing to a channel that doesn't consume all the bytes in the buffer works
correctly */
+  @Test
+  def testWriteToChannelThatConsumesPartially() {
+    val bytesToConsumePerBuffer = 50
+    val messages = (0 until 10).map(_ => new Message(TestUtils.randomString(100).getBytes))
+    val messageSet = createMessageSet(messages)
+    val messageSetSize = messageSet.sizeInBytes
+
+    val channel = new StubByteChannel(bytesToConsumePerBuffer)
+
+    var remaining = messageSetSize
+    var iterations = 0
+    while (remaining > 0) {
+      remaining -= messageSet.writeTo(channel, messageSetSize - remaining, remaining)
+      iterations += 1
+    }
+
+    assertEquals((messageSetSize / bytesToConsumePerBuffer) + 1, iterations)
+    checkEquals(new ByteBufferMessageSet(ByteBuffer.wrap(channel.data.toArray)).iterator,
messageSet.iterator)
+  }
+
+  def checkWriteToWithMessageSet(messageSet: MessageSet) {
+    checkWriteWithMessageSet(messageSet, messageSet.writeTo(_, 0, messageSet.sizeInBytes))
+  }
+
+  def checkWriteWithMessageSet(set: MessageSet, write: GatheringByteChannel => Long) {
     // do the write twice to ensure the message set is restored to its original state
-    for(i <- List(0,1)) {
+    for (_ <- 0 to 1) {
       val file = tempFile()
-      val channel = new RandomAccessFile(file, "rw").getChannel()
-      val written = set.writeTo(channel, 0, 1024)
-      assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written)
-      val newSet = new FileMessageSet(file, channel)
-      checkEquals(set.iterator, newSet.iterator)
+      val channel = FileChannel.open(file.toPath, StandardOpenOption.READ, StandardOpenOption.WRITE)
+      try {
+        val written = write(channel)
+        assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes,
written)
+        val newSet = new FileMessageSet(file, channel)
+        checkEquals(set.iterator, newSet.iterator)
+      } finally channel.close()
     }
   }
   
 }
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/64842f47/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 8f66d62..4810009 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -380,6 +380,16 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                                        messageTimestampType
= TimestampType.CREATE_TIME,
                                                                        messageTimestampDiffMaxMs
= 5000L)._1, offset)
   }
+
+  @Test
+  def testWriteFullyTo() {
+    checkWriteFullyToWithMessageSet(createMessageSet(Array[Message]()))
+    checkWriteFullyToWithMessageSet(createMessageSet(messages))
+  }
+
+  def checkWriteFullyToWithMessageSet(messageSet: ByteBufferMessageSet) {
+    checkWriteWithMessageSet(messageSet, messageSet.writeFullyTo)
+  }
   
   /* check that offsets are assigned based on byte offset from the given base offset */
   def checkOffsets(messages: ByteBufferMessageSet, baseOffset: Long) {


Mime
View raw message