activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1245934 - in /activemq/activemq-apollo/trunk/apollo-leveldb: ./ src/main/scala/org/apache/activemq/apollo/broker/store/ src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/ src/main/scala/org/apache/activemq/apollo/broker/st...
Date Sat, 18 Feb 2012 14:49:24 GMT
Author: chirino
Date: Sat Feb 18 14:49:24 2012
New Revision: 1245934

URL: http://svn.apache.org/viewvc?rev=1245934&view=rev
Log:
Add option which attempts to Snappy compress logged messages to reduce disk IO.

Added:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/SnappyTrait.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/pom.xml
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/pom.xml?rev=1245934&r1=1245933&r2=1245934&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/pom.xml Sat Feb 18 14:49:24 2012
@@ -68,6 +68,19 @@
       <version>${leveldbjni-version}</version>
     </dependency>
 
+    <!-- For Optional Snappy Compression -->
+    <dependency>
+      <groupId>org.xerial.snappy</groupId>
+      <artifactId>snappy-java</artifactId>
+      <version>1.0.3</version>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.iq80.snappy</groupId>
+      <artifactId>snappy</artifactId>
+      <version>0.2</version>
+      <optional>true</optional>
+    </dependency>
 
     <!-- Since we implement a jade template to display the LevelDB status -->
     <dependency>

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/SnappyTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/SnappyTrait.scala?rev=1245934&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/SnappyTrait.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/SnappyTrait.scala
Sat Feb 18 14:49:24 2012
@@ -0,0 +1,139 @@
+package org.apache.activemq.apollo.broker.store
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.nio.ByteBuffer
+import org.xerial.snappy.{Snappy => Xerial}
+import org.iq80.snappy.{Snappy => Iq80}
+import org.fusesource.hawtbuf.Buffer
+
+/**
+ * <p>
+ * A Snappy abstraction which attempts uses the iq80 implementation and falls back
+ * to the xerial Snappy implementation it cannot be loaded.  You can change the
+ * load order by setting the 'leveldb.snappy' system property.  Example:
+ *
+ * <code>
+ * -Dleveldb.snappy=xerial,iq80
+ * </code>
+ *
+ * The system property can also be configured with the name of a class which
+ * implements the Snappy.SPI interface.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+package object leveldb  {
+
+  final val Snappy = {
+    var attempt:SnappyTrait = null
+    System.getProperty("leveldb.snappy", "iq80,xerial").split(",").foreach { x =>
+      if( attempt==null ) {
+        try {
+            var name = x.trim();
+            name = name.toLowerCase match {
+              case "xerial" => "org.apache.activemq.apollo.broker.store.leveldb.XerialSnappy"
+              case "iq80" => "org.apache.activemq.apollo.broker.store.leveldb.IQ80Snappy"
+              case _ => name
+            }
+            attempt = Thread.currentThread().getContextClassLoader().loadClass(name).newInstance().asInstanceOf[SnappyTrait];
+        } catch {
+          case _ =>
+        }
+      }
+    }
+    attempt
+  }
+
+
+  trait SnappyTrait {
+    
+    def uncompressed_length(input: Buffer):Int
+    def uncompress(input: Buffer, output:Buffer): Int
+    
+    def max_compressed_length(length: Int): Int
+    def compress(input: Buffer, output: Buffer): Int
+
+    def compress(input: Buffer):Buffer = {
+      val compressed = new Buffer(max_compressed_length(input.length))
+      compressed.length = compress(input, compressed)
+      compressed
+    }
+    
+    def compress(text: String): Buffer = {
+      val uncompressed = new Buffer(text.getBytes("UTF-8"))
+      val compressed = new Buffer(max_compressed_length(uncompressed.length))
+      compressed.length = compress(uncompressed, compressed)
+      return compressed
+    }
+    
+    def uncompress(input: Buffer):Buffer = {
+      val uncompressed = new Buffer(uncompressed_length(input))
+      uncompressed.length = uncompress(input, uncompressed)
+      uncompressed
+    }
+
+    def uncompress(compressed: ByteBuffer, uncompressed: ByteBuffer): Int = {
+      val input = if (compressed.hasArray) {
+        new Buffer(compressed.array, compressed.arrayOffset + compressed.position, compressed.remaining)
+      } else {
+        val t = new Buffer(compressed.remaining)
+        compressed.mark
+        compressed.get(t.data)
+        compressed.reset
+        t
+      }
+
+      val output = if (uncompressed.hasArray) {
+        new Buffer(uncompressed.array, uncompressed.arrayOffset + uncompressed.position,
uncompressed.capacity()-uncompressed.position)
+      } else {
+        new Buffer(uncompressed_length(input))
+      }
+
+      output.length = uncompress(input, output)
+
+      if (uncompressed.hasArray) {
+        uncompressed.limit(uncompressed.position + output.length)
+      } else {
+        val p = uncompressed.position
+        uncompressed.limit(uncompressed.capacity)
+        uncompressed.put(output.data, output.offset, output.length)
+        uncompressed.flip.position(p)
+      }
+      return output.length
+    }
+  }
+
+}
+package leveldb  {
+
+  class XerialSnappy extends SnappyTrait {
+    override def uncompress(compressed: ByteBuffer, uncompressed: ByteBuffer) = Xerial.uncompress(compressed,
uncompressed)
+    def uncompressed_length(input: Buffer) = Xerial.uncompressedLength(input.data, input.offset,
input.length)
+    def uncompress(input: Buffer, output: Buffer) = Xerial.uncompress(input.data, input.offset,
input.length, output.data, output.offset)
+    def max_compressed_length(length: Int) = Xerial.maxCompressedLength(length)
+    def compress(input: Buffer, output: Buffer) = Xerial.compress(input.data, input.offset,
input.length, output.data, output.offset)
+    override def compress(text: String) = new Buffer(Xerial.compress(text))
+  }
+
+  class IQ80Snappy extends SnappyTrait {
+    def uncompressed_length(input: Buffer) = Iq80.getUncompressedLength(input.data, input.offset)
+    def uncompress(input: Buffer, output: Buffer): Int = Iq80.uncompress(input.data, input.offset,
input.length, output.data, output.offset)
+    def compress(input: Buffer, output: Buffer): Int = Iq80.compress(input.data, input.offset,
input.length, output.data, output.offset)
+    def max_compressed_length(length: Int) = Iq80.maxCompressedLength(length)
+  }
+}

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java?rev=1245934&r1=1245933&r2=1245934&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
Sat Feb 18 14:49:24 2012
@@ -70,6 +70,9 @@ public class LevelDBStoreDTO extends Sto
     @XmlAttribute(name="index_compression")
     public String index_compression;
 
+    @XmlAttribute(name="log_compression")
+    public String log_compression;
+
     @XmlAttribute(name="index_factory")
     public String index_factory;
 

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1245934&r1=1245933&r2=1245934&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Sat Feb 18 14:49:24 2012
@@ -66,11 +66,13 @@ object LevelDBClient extends Log {
   final val LOG_ADD_QUEUE           = 1.toByte
   final val LOG_REMOVE_QUEUE        = 2.toByte
   final val LOG_ADD_MESSAGE         = 3.toByte
-  final val LOG_REMOVE_MESSAGE      = 4.toByte
   final val LOG_ADD_QUEUE_ENTRY     = 5.toByte
   final val LOG_REMOVE_QUEUE_ENTRY  = 6.toByte
   final val LOG_MAP_ENTRY           = 7.toByte
 
+  final val LOG_ADD_MESSAGE_SNAPPY  = (LOG_ADD_MESSAGE+100).toByte
+  final val LOG_MAP_ENTRY_SNAPPY    = (LOG_MAP_ENTRY+100).toByte
+
   final val LOG_SUFFIX  = ".log"
   final val INDEX_SUFFIX  = ".index"
 
@@ -198,6 +200,7 @@ class LevelDBClient(store: LevelDBStore)
 
   var log:RecordLog = _
 
+  var snappy_compress_logs = false
   var index:RichDB = _
   var index_options:Options = _
 
@@ -256,6 +259,10 @@ class LevelDBClient(store: LevelDBStore)
       case _ => CompressionType.SNAPPY
     }) )
 
+    if( Option(config.log_compression).map(_.toLowerCase).getOrElse("snappy") == "snappy"
&& Snappy!=null ) {
+      snappy_compress_logs = true
+    }
+    
     index_options.cacheSize(Option(config.index_cache_size).map(MemoryPropertyEditor.parse(_).toLong).getOrElse(1024*1024*256L))
     index_options.logger(new Logger() {
       def log(msg: String) = trace(msg.stripSuffix("\n"))
@@ -412,7 +419,7 @@ class LevelDBClient(store: LevelDBStore)
                     val record = QueuePB.FACTORY.parseUnframed(data)
                     index.put(encode_key(queue_prefix, record.getKey), data)
 
-                  case LOG_REMOVE_QUEUE =>
+                  case LOG_REMOVE_QUEUE=>
                     replay_operations+=1
                     val ro = new ReadOptions
                     ro.fillCache(false)
@@ -430,7 +437,7 @@ class LevelDBClient(store: LevelDBStore)
                       true
                     }
 
-                  case LOG_MAP_ENTRY =>
+                  case LOG_MAP_ENTRY | LOG_MAP_ENTRY_SNAPPY =>
                     replay_operations+=1
                     val entry = MapEntryPB.FACTORY.parseUnframed(data)
                     if (entry.getValue == null) {
@@ -724,7 +731,7 @@ class LevelDBClient(store: LevelDBStore)
   def add_queue(record: QueueRecord, callback:Runnable) = {
     retry_using_index {
       log.appender { appender =>
-        val value:Buffer = PBSupport.encode_queue_record(record)
+        val value:Buffer = PBSupport.to_pb(record).freeze().toUnframedBuffer
         appender.append(LOG_ADD_QUEUE, value)
         index.put(encode_key(queue_prefix, record.key), value)
       }
@@ -795,7 +802,9 @@ class LevelDBClient(store: LevelDBStore)
                 entry.setValue(value)
                 batch.put(encode_key(map_prefix, key), value.toByteArray)
               }
-              appender.append(LOG_MAP_ENTRY, entry.freeze().toUnframedByteArray)
+              var log_data = entry.freeze().toUnframedBuffer
+              
+              appender.append(LOG_MAP_ENTRY, log_data)
             }
 
             uow.actions.foreach { case (msg, action) =>
@@ -804,10 +813,25 @@ class LevelDBClient(store: LevelDBStore)
               var log_info:LogInfo = null
 
               if (message_record != null) {
-                val message_data = PBSupport.encode_message_record(message_record)
-                val len = message_data.length
-                val p = appender.append(LOG_ADD_MESSAGE, message_data)
-                locator = (p._1, len)
+                
+                val pb = new MessagePB.Bean
+                pb.setProtocol(message_record.protocol)
+                pb.setSize(message_record.size)
+                pb.setValue(message_record.buffer)
+                var message_data = pb.freeze().toUnframedBuffer
+
+                val p = if( snappy_compress_logs ) {
+                  val compressed = Snappy.compress(message_data)
+                  if( compressed.length < message_data.length ) {
+                    message_data = compressed
+                    appender.append(LOG_ADD_MESSAGE_SNAPPY, message_data)
+                  } else {
+                    appender.append(LOG_ADD_MESSAGE, message_data)
+                  }
+                } else {
+                  appender.append(LOG_ADD_MESSAGE, message_data)
+                }
+                locator = (p._1, message_data.length)
                 log_info = p._2
                 message_record.locator.set(locator);
               }
@@ -819,6 +843,7 @@ class LevelDBClient(store: LevelDBStore)
                 assert(locator!=null)
                 val (pos, len) = locator
                 val key = encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq)
+
                 appender.append(LOG_REMOVE_QUEUE_ENTRY, key)
                 batch.delete(key)
                 log_ref_decrement(pos, log_info)
@@ -890,8 +915,13 @@ class LevelDBClient(store: LevelDBStore)
           val (_, locator, callback) = x
           val record = metric_load_from_index_counter.time {
             val (pos, len ) = locator.get().asInstanceOf[(Long, Int)]
-            log.read(pos, len).map { data =>
-              val rc = PBSupport.decode_message_record(data)
+            log.read(pos, len).map { case (kind, data) =>
+
+              val msg_data = kind match {
+                case LOG_ADD_MESSAGE => data
+                case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
+              }
+              val rc = PBSupport.from_pb(MessagePB.FACTORY.parseUnframed(msg_data))
               rc.locator = locator
               assert( rc.protocol!=null )
               rc
@@ -919,9 +949,14 @@ class LevelDBClient(store: LevelDBStore)
           val (_, locator, callback) = x
           val record:Option[MessageRecord] = metric_load_from_index_counter.time {
             val (pos, len ) = locator.get().asInstanceOf[(Long, Int)]
-            log.read(pos, len).map { x =>
-              val rc:MessageRecord = PBSupport.decode_message_record(x)
+            log.read(pos, len).map { case (kind, data) =>
+              val msg_data = kind match {
+                case LOG_ADD_MESSAGE => data
+                case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
+              }
+              val rc = PBSupport.from_pb(MessagePB.FACTORY.parseUnframed(msg_data))
               rc.locator = locator
+              assert( rc.protocol!=null )
               rc
             }
           }
@@ -951,7 +986,7 @@ class LevelDBClient(store: LevelDBStore)
       ro.fillCache(false)
       ro.verifyChecksums(verify_checksums)
       index.get(encode_key(queue_prefix, queue_key), ro).map{ x=>
-        PBSupport.decode_queue_record(x)
+        PBSupport.from_pb(QueuePB.FACTORY.parseUnframed(x))
       }
     }
   }
@@ -1182,9 +1217,12 @@ class LevelDBClient(store: LevelDBStore)
           index.cursor_prefixed(Array(tmp_prefix)) { (key, value) =>
             val (_, pos) = decode_long_key(key)
             val len = decode_vlong(value).toInt
-            log.read(pos, len).foreach { value =>
-              // Set the message key to be the position in the log.
-              val record = MessagePB.FACTORY.parseUnframed(value).copy
+            log.read(pos, len).foreach { case (kind, data) =>
+              val msg_data = kind match {
+                case LOG_ADD_MESSAGE => data
+                case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
+              }
+              val record = MessagePB.FACTORY.parseUnframed(msg_data).copy()
               record.setMessageKey(pos)
               manager.store_message(record)
             }
@@ -1253,8 +1291,18 @@ class LevelDBClient(store: LevelDBStore)
           while(manager.getNext match {
 
             case record:MessagePB.Buffer =>
-              val message_data = record.toUnframedBuffer
-              val (pos, _) = appender.append(LOG_ADD_MESSAGE, message_data)
+              var message_data = record.toUnframedBuffer
+              val (pos, _) = if( snappy_compress_logs ) {
+                val compressed = Snappy.compress(message_data)
+                if( compressed.length < message_data.length ) {
+                  message_data = compressed
+                  appender.append(LOG_ADD_MESSAGE_SNAPPY, message_data)
+                } else {
+                  appender.append(LOG_ADD_MESSAGE, message_data)
+                }
+              } else {
+                appender.append(LOG_ADD_MESSAGE, message_data)
+              }
               index.put(encode_key(tmp_prefix, record.getMessageKey), encode_locator(pos,
message_data.length))
               true
 

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1245934&r1=1245933&r2=1245934&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Sat Feb 18 14:49:24 2012
@@ -227,9 +227,12 @@ case class RecordLog(directory: File, lo
       
       check_read_flush(offset+LOG_HEADER_SIZE+length)
       
-      if(verify_checksums) {
+      val record = new Buffer(LOG_HEADER_SIZE+length)
+      if( channel.read(record.toByteBuffer, offset) != record.length ) {
+        throw new IOException("short record at position: "+record_position+" in file: "+file+",
offset: "+offset)
+      }
 
-        val record = new Buffer(LOG_HEADER_SIZE+length)
+      if(verify_checksums) {
 
         def record_is_not_changing = {
           using(open) { fd =>
@@ -242,10 +245,6 @@ case class RecordLog(directory: File, lo
           }
         }
 
-        if( channel.read(record.toByteBuffer, offset) != record.length ) {
-          assert( record_is_not_changing )
-          throw new IOException("short record at position: "+record_position+" in file: "+file+",
offset: "+offset)
-        }
 
         val is = new DataByteArrayInputStream(record)
         val prefix = is.readByte()
@@ -254,7 +253,7 @@ case class RecordLog(directory: File, lo
           throw new IOException("invalid record at position: "+record_position+" in file:
"+file+", offset: "+offset)
         }
 
-        val id = is.readByte()
+        val kind = is.readByte()
         val expectedChecksum = is.readInt()
         val expectedLength = is.readInt()
         val data = is.readBuffer(length)
@@ -267,13 +266,11 @@ case class RecordLog(directory: File, lo
           }
         }
 
-        data
+        (kind, data)
       } else {
-        val data = new Buffer(length)
-        if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) != data.length ) {
-          throw new IOException("short record at position: "+record_position+" in file: "+file+",
offset: "+offset)
-        }
-        data
+        val kind = record.get(1)
+        record.moveHead(LOG_HEADER_SIZE)
+        (kind, record)
       }
     }
 



Mime
View raw message