activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1205507 - in /activemq/activemq-apollo/trunk/apollo-leveldb/src: main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/ test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/
Date Wed, 23 Nov 2011 17:41:14 GMT
Author: chirino
Date: Wed Nov 23 17:41:12 2011
New Revision: 1205507

URL: http://svn.apache.org/viewvc?rev=1205507&view=rev
Log:
Make sure we test against the pure java version of the leveldb implementation.

Added:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/PureJavaLevelDBStoreTest.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala?rev=1205507&r1=1205506&r2=1205507&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
Wed Nov 23 17:41:12 2011
@@ -17,24 +17,25 @@
 package org.apache.activemq.apollo.broker.store.leveldb
 
 import org.fusesource.hawtbuf._
-import java.util.concurrent.TimeUnit
 import org.iq80.leveldb._
-import org.fusesource.leveldbjni.JniDBFactory._
-import java.io.{DataOutput, DataOutputStream}
+import java.io.DataOutput
 
 object HelperTrait {
 
   def encode(a1:Long):Array[Byte] = {
-    val out = new DataByteArrayOutputStream(
-      AbstractVarIntSupport.computeVarLongSize(a1)
-    )
-    out.writeVarLong(a1)
+//    val out = new DataByteArrayOutputStream(
+//      AbstractVarIntSupport.computeVarLongSize(a1)
+//    )
+//    out.writeVarLong(a1)
+    val out = new DataByteArrayOutputStream(8)
+    out.writeLong(a1)
     out.getData
   }
 
   def decode_long(bytes:Array[Byte]):Long = {
     val in = new DataByteArrayInputStream(bytes)
-    in.readVarLong()
+//    in.readVarLong()
+    in.readLong()
   }
 
   def encode(a1:Byte, a2:Long):Array[Byte] = {
@@ -83,6 +84,8 @@ object HelperTrait {
 
   final class RichDB(val db: DB) {
 
+    val is_pure_java_version = db.getClass.getName == "org.iq80.leveldb.impl.DbImpl"
+    
     def getProperty(name:String) = db.getProperty(name)
 
     def getApproximateSizes(ranges:Range*) = db.getApproximateSizes(ranges:_*)
@@ -173,7 +176,11 @@ object HelperTrait {
       iterator.seek(start_included);
       try {
         def check(key:Array[Byte]) = {
-          (compare(key,end_excluded) < 0) && func(key)
+          if ( compare(key,end_excluded) < 0) {
+            func(key)
+          } else {
+            false
+          }
         }
         while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
           iterator.next()
@@ -199,32 +206,44 @@ object HelperTrait {
     }
 
     def last_key(prefix:Array[Byte], ro:ReadOptions=new ReadOptions): Option[Array[Byte]]
= {
-      val copy = new Buffer(prefix).deepCopy().data
-      if ( copy.length > 0 ) {
-        val pos = copy.length-1
-        copy(pos) = (copy(pos)+1).toByte
-      }
-      val iterator = db.iterator(ro)
-      try {
-        iterator.seek(copy);
-        if ( iterator.hasPrev ) {
-          iterator.prev()
-        } else {
-          iterator.seekToLast()
-        }
+      val last = new Buffer(prefix).deepCopy().data
+      if ( last.length > 0 ) {
+        val pos = last.length-1
+        last(pos) = (last(pos)+1).toByte
+      }
+
+      if(is_pure_java_version) {
+        // The pure java version of LevelDB does not support backward iteration.
+        var rc:Option[Array[Byte]] = None
+        cursor_range_keys(prefix, last) { key=>
+          rc = Some(key)
+          true
+        }
+        rc
+      } else {
+        val iterator = db.iterator(ro)
+        try {
         
-        if ( iterator.hasNext ) {
-          val key = iterator.peekNext.getKey
-          if(key.startsWith(prefix)) {
-            Some(key)
+          iterator.seek(last);
+          if ( iterator.hasPrev ) {
+            iterator.prev()
+          } else {
+            iterator.seekToLast()
+          }
+
+          if ( iterator.hasNext ) {
+            val key = iterator.peekNext.getKey
+            if(key.startsWith(prefix)) {
+              Some(key)
+            } else {
+              None
+            }
           } else {
             None
-          } 
-        } else {
-          None
+          }
+        } finally {
+          iterator.close();
         }
-      } finally {
-        iterator.close();
       }
     }
   }

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1205507&r1=1205506&r2=1205507&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Wed Nov 23 17:41:12 2011
@@ -444,7 +444,7 @@ class LevelDBClient(store: LevelDBStore)
     }
   }
 
-  def addQueue(record: QueueRecord, callback:Runnable) = {
+  def add_queue(record: QueueRecord, callback:Runnable) = {
     retry_using_index {
       log.appender { appender =>
         appender.append(LOG_ADD_QUEUE, record)
@@ -454,7 +454,7 @@ class LevelDBClient(store: LevelDBStore)
     callback.run
   }
 
-  def removeQueue(queue_key: Long, callback:Runnable) = {
+  def remove_queue(queue_key: Long, callback:Runnable) = {
     retry_using_index {
       log.appender { appender =>
         val ro = new ReadOptions
@@ -614,7 +614,7 @@ class LevelDBClient(store: LevelDBStore)
     }
   }
 
-  def listQueues: Seq[Long] = {
+  def list_queues: Seq[Long] = {
     val rc = ListBuffer[Long]()
     retry_using_index {
       val ro = new ReadOptions
@@ -628,7 +628,7 @@ class LevelDBClient(store: LevelDBStore)
     rc
   }
 
-  def getQueue(queue_key: Long): Option[QueueRecord] = {
+  def get_queue(queue_key: Long): Option[QueueRecord] = {
     retry_using_index {
       val ro = new ReadOptions
       ro.fillCache(false)
@@ -715,7 +715,7 @@ class LevelDBClient(store: LevelDBStore)
     }
   }
 
-  def getLastQueueKey:Long = {
+  def get_last_queue_key:Long = {
     retry_using_index {
       index.last_key(queue_prefix_array).map(decode_long_key(_)._2).getOrElse(0)
     }

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala?rev=1205507&r1=1205506&r2=1205507&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
Wed Nov 23 17:41:12 2011
@@ -102,7 +102,7 @@ class LevelDBStore(val config:LevelDBSto
         try {
           client.start()
           next_msg_key.set(client.getLastMessageKey + 1)
-          next_queue_key.set(client.getLastQueueKey + 1)
+          next_queue_key.set(client.get_last_queue_key + 1)
           poll_gc
           on_completed.run
         } catch {
@@ -188,31 +188,31 @@ class LevelDBStore(val config:LevelDBSto
    */
   def get_last_queue_key(callback:(Option[Long])=>Unit):Unit = {
     write_executor {
-      callback(Some(client.getLastQueueKey))
+      callback(Some(client.get_last_queue_key))
     }
   }
 
   def add_queue(record: QueueRecord)(callback: (Boolean) => Unit) = {
     write_executor {
-     client.addQueue(record, ^{ callback(true) })
+     client.add_queue(record, ^{ callback(true) })
     }
   }
 
   def remove_queue(queueKey: Long)(callback: (Boolean) => Unit) = {
     write_executor {
-      client.removeQueue(queueKey,^{ callback(true) })
+      client.remove_queue(queueKey,^{ callback(true) })
     }
   }
 
   def get_queue(queueKey: Long)(callback: (Option[QueueRecord]) => Unit) = {
     write_executor {
-      callback( client.getQueue(queueKey) )
+      callback( client.get_queue(queueKey) )
     }
   }
 
   def list_queues(callback: (Seq[Long]) => Unit) = {
     write_executor {
-      callback( client.listQueues )
+      callback( client.list_queues )
     }
   }
 

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/PureJavaLevelDBStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/PureJavaLevelDBStoreTest.scala?rev=1205507&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/PureJavaLevelDBStoreTest.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/PureJavaLevelDBStoreTest.scala
Wed Nov 23 17:41:12 2011
@@ -0,0 +1,39 @@
+package org.apache.activemq.apollo.broker.store.leveldb.leveldb
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.activemq.apollo.broker.store.{Store, StoreFunSuiteSupport}
+import org.apache.activemq.apollo.broker.store.leveldb.LevelDBStore
+import org.apache.activemq.apollo.broker.store.leveldb.dto.LevelDBStoreDTO
+import org.apache.activemq.apollo.util.FileSupport._
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class PureJavaLevelDBStoreTest extends StoreFunSuiteSupport {
+
+  def create_store(flushDelay:Long):Store = {
+    new LevelDBStore({
+      val rc = new LevelDBStoreDTO
+      rc.index_factory = "org.iq80.leveldb.impl.Iq80DBFactory"
+      rc.directory = basedir / "target" / "apollo-data"
+      rc.flush_delay = flushDelay
+      rc
+    })
+  }
+
+}



Mime
View raw message