kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1159459 - in /incubator/kafka/trunk/core/src: main/scala/kafka/javaapi/message/ main/scala/kafka/message/ test/scala/unit/kafka/javaapi/message/
Date Fri, 19 Aug 2011 00:47:35 GMT
Author: nehanarkhede
Date: Fri Aug 19 00:47:34 2011
New Revision: 1159459

URL: http://svn.apache.org/viewvc?rev=1159459&view=rev
Log:
CompressionUtils introduces a GZIP header while compressing empty message sets KAFKA-109;
patched by Neha; reviewed by Jun

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1159459&r1=1159458&r2=1159459&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
Fri Aug 19 00:47:34 2011
@@ -31,24 +31,8 @@ class ByteBufferMessageSet(private val b
   def this(buffer: ByteBuffer) = this(buffer, 0L, ErrorMapping.NoError)
 
   def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
-    this(compressionCodec match {
-      case NoCompressionCodec =>
-        val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
-        val messageIterator = messages.iterator
-        while(messageIterator.hasNext) {
-          val message = messageIterator.next
-          message.serializeTo(buffer)
-        }
-        buffer.rewind
-        buffer
-      case _ =>
-        import scala.collection.JavaConversions._
-        val message = CompressionUtils.compress(asBuffer(messages), compressionCodec)
-        val buffer = ByteBuffer.allocate(message.serializedSize)
-        message.serializeTo(buffer)
-        buffer.rewind
-        buffer
-    }, 0L, ErrorMapping.NoError)
+    this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages):
_*),
+         0L, ErrorMapping.NoError)
   }
 
   def this(messages: java.util.List[Message]) {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1159459&r1=1159458&r2=1159459&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Fri
Aug 19 00:47:34 2011
@@ -43,22 +43,7 @@ class ByteBufferMessageSet(private val b
   private var deepValidByteCount = -1L
 
   def this(compressionCodec: CompressionCodec, messages: Message*) {
-    this(
-      compressionCodec match {
-        case NoCompressionCodec =>
-          val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
-          for (message <- messages) {
-            message.serializeTo(buffer)
-          }
-          buffer.rewind
-          buffer
-        case _ =>
-          val message = CompressionUtils.compress(messages, compressionCodec)
-          val buffer = ByteBuffer.allocate(message.serializedSize)
-          message.serializeTo(buffer)
-          buffer.rewind
-          buffer
-      }, 0L, ErrorMapping.NoError)
+    this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError)
   }
 
   def this(messages: Message*) {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala?rev=1159459&r1=1159458&r2=1159459&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala Fri Aug 19 00:47:34
2011
@@ -51,7 +51,30 @@ object MessageSet {
    * The size of a size-delimited entry in a message set
    */
   def entrySize(message: Message): Int = LogOverhead + message.size
-  
+
+  def createByteBuffer(compressionCodec: CompressionCodec, messages: Message*): ByteBuffer
=
+    compressionCodec match {
+      case NoCompressionCodec =>
+        val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+        for (message <- messages) {
+          message.serializeTo(buffer)
+        }
+        buffer.rewind
+        buffer
+      case _ =>
+        messages.size match {
+          case 0 =>
+            val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+            buffer.rewind
+            buffer
+          case _ =>
+            val message = CompressionUtils.compress(messages, compressionCodec)
+            val buffer = ByteBuffer.allocate(message.serializedSize)
+            message.serializeTo(buffer)
+            buffer.rewind
+            buffer
+        }
+    }
 }
 
 /**

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala?rev=1159459&r1=1159458&r2=1159459&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
(original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
Fri Aug 19 00:47:34 2011
@@ -69,7 +69,7 @@ trait BaseMessageSetTestCases extends JU
   @Test
   def testSizeInBytesWithCompression () {
     assertEquals("Empty message set should have 0 bytes.",
-                 30L,           // overhead of the GZIP output stream
+                 0L,           // overhead of the GZIP output stream
                  createMessageSet(Array[Message](), DefaultCompressionCodec).sizeInBytes)
   }
 }



Mime
View raw message