activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1291054 [2/2] - in /activemq/activemq-apollo/trunk/apollo-leveldb/src: main/scala/org/apache/activemq/apollo/broker/store/leveldb/ main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/ test/scala/org/apache/activemq/apollo/br...
Date Sun, 19 Feb 2012 20:24:16 GMT
Copied: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala
(from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala?p2=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala&p1=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala&r1=1245945&r2=1291054&rev=1291054&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala
Sun Feb 19 20:24:15 2012
@@ -1,3 +1,5 @@
+package org.apache.activemq.apollo.broker.store.leveldb
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -14,10 +16,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker.store.leveldb
 
-import java.{lang=>jl}
-import java.{util=>ju}
+import java.{lang => jl}
+import java.{util => ju}
 
 import java.util.zip.CRC32
 import java.util.Map.Entry
@@ -42,56 +43,59 @@ object RecordLog extends Log {
 
   val LOG_HEADER_PREFIX = '*'.toByte
   val UOW_END_RECORD = -1.toByte
-  
+
   val LOG_HEADER_SIZE = 10
 
-  val BUFFER_SIZE = 1024*512
-  val BYPASS_BUFFER_SIZE = 1024*16
+  val BUFFER_SIZE = 1024 * 512
+  val BYPASS_BUFFER_SIZE = 1024 * 16
 
-  case class LogInfo(file:File, position:Long, length:Long) {
-    def limit = position+length
+  case class LogInfo(file: File, position: Long, length: Long) {
+    def limit = position + length
   }
-  
-  def encode_long(a1:Long) = {
+
+  def encode_long(a1: Long) = {
     val out = new DataByteArrayOutputStream(8)
     out.writeLong(a1)
     out.toBuffer
   }
 
-  def decode_long(value:Buffer):Long = {
+  def decode_long(value: Buffer): Long = {
     val in = new DataByteArrayInputStream(value)
     in.readLong()
   }
-  
+
 }
 
-case class RecordLog(directory: File, logSuffix:String) {
+case class RecordLog(directory: File, logSuffix: String) {
+
   import RecordLog._
 
   directory.mkdirs()
 
   var logSize = 1024 * 1024 * 100L
-  var current_appender:LogAppender = _
+  var current_appender: LogAppender = _
   var verify_checksums = false
   var sync = false
 
 
   val log_infos = new TreeMap[Long, LogInfo]()
+
   object log_mutex
 
-  def delete(id:Long) = {
+  def delete(id: Long) = {
     log_mutex.synchronized {
       // We can't delete the current appender.
-      if( current_appender.position != id ) {
-        Option(log_infos.get(id)).foreach { info =>
-          onDelete(info.file)
-          log_infos.remove(id)
+      if (current_appender.position != id) {
+        Option(log_infos.get(id)).foreach {
+          info =>
+            onDelete(info.file)
+            log_infos.remove(id)
         }
       }
     }
   }
 
-  protected def onDelete(file:File) = {
+  protected def onDelete(file: File) = {
     file.delete()
   }
 
@@ -101,7 +105,7 @@ case class RecordLog(directory: File, lo
     (checksum.getValue & 0xFFFFFFFF).toInt
   }
 
-  class LogAppender(file:File, position:Long) extends LogReader(file, position) {
+  class LogAppender(file: File, position: Long) extends LogReader(file, position) {
 
     val info = new LogInfo(file, position, 0)
 
@@ -116,23 +120,23 @@ case class RecordLog(directory: File, lo
     val flushed_offset = new AtomicLong(0)
 
     def append_position = {
-      position+append_offset
+      position + append_offset
     }
 
     // set the file size ahead of time so that we don't have to sync the file
     // meta-data on every log sync.
-    channel.position(logSize-1)
+    channel.position(logSize - 1)
     channel.write(new Buffer(1).toByteBuffer)
     channel.force(true)
-    if( sync ) {
+    if (sync) {
       channel.position(0)
     }
 
-    val write_buffer = new DataByteArrayOutputStream(BUFFER_SIZE+LOG_HEADER_SIZE)
+    val write_buffer = new DataByteArrayOutputStream(BUFFER_SIZE + LOG_HEADER_SIZE)
 
     def force = {
       flush
-      if(sync) {
+      if (sync) {
         // only need to update the file metadata if the file size changes..
         channel.force(append_offset > logSize)
       }
@@ -141,19 +145,19 @@ case class RecordLog(directory: File, lo
     /**
      * returns the offset position of the data record.
      */
-    def append(id:Byte, data: Buffer) = this.synchronized {
+    def append(id: Byte, data: Buffer) = this.synchronized {
       val record_position = append_position
       val data_length = data.length
       val total_length = LOG_HEADER_SIZE + data_length
-      
-      if( write_buffer.position() + total_length > BUFFER_SIZE ) {
+
+      if (write_buffer.position() + total_length > BUFFER_SIZE) {
         flush
       }
 
       val cs: Int = checksum(data)
-//      trace("Writing at: "+record_position+" len: "+data_length+" with checksum: "+cs)
+      //      trace("Writing at: "+record_position+" len: "+data_length+" with checksum:
"+cs)
 
-      if( false && total_length > BYPASS_BUFFER_SIZE ) {
+      if (false && total_length > BYPASS_BUFFER_SIZE) {
 
         // Write the header and flush..
         write_buffer.writeByte(LOG_HEADER_PREFIX)
@@ -166,10 +170,10 @@ case class RecordLog(directory: File, lo
 
         // Directly write the data to the channel since it's large.
         val buffer = data.toByteBuffer
-        val pos = append_offset+LOG_HEADER_SIZE
+        val pos = append_offset + LOG_HEADER_SIZE
         flushed_offset.addAndGet(buffer.remaining)
         channel.write(buffer, pos)
-        if( buffer.hasRemaining ) {
+        if (buffer.hasRemaining) {
           throw new IOException("Short write")
         }
         append_offset += data_length
@@ -182,24 +186,24 @@ case class RecordLog(directory: File, lo
         write_buffer.write(data.data, data.offset, data_length)
         append_offset += total_length
       }
-      (record_position,info)
+      (record_position, info)
     }
 
     def flush = this.synchronized {
-      if( write_buffer.position() > 0 ) {
+      if (write_buffer.position() > 0) {
         val buffer = write_buffer.toBuffer.toByteBuffer
-        val pos = append_offset-buffer.remaining
+        val pos = append_offset - buffer.remaining
         flushed_offset.addAndGet(buffer.remaining)
         channel.write(buffer, pos)
-        if( buffer.hasRemaining ) {
+        if (buffer.hasRemaining) {
           throw new IOException("Short write")
         }
         write_buffer.reset()
       }
     }
 
-    override def check_read_flush(end_offset:Long) = {
-      if( flushed_offset.get() < end_offset )  {
+    override def check_read_flush(end_offset: Long) = {
+      if (flushed_offset.get() < end_offset) {
         this.synchronized {
           flush
         }
@@ -208,7 +212,7 @@ case class RecordLog(directory: File, lo
 
   }
 
-  case class LogReader(file:File, position:Long) extends BaseRetained {
+  case class LogReader(file: File, position: Long) extends BaseRetained {
 
     def open = new RandomAccessFile(file, "r")
 
@@ -219,38 +223,39 @@ case class RecordLog(directory: File, lo
       fd.close()
     }
 
-    def check_read_flush(end_offset:Long) = {}
-    
-    def read(record_position:Long, length:Int) = {
-      val offset = record_position-position
-      assert(offset >=0 )
-      
-      check_read_flush(offset+LOG_HEADER_SIZE+length)
-      
-      val record = new Buffer(LOG_HEADER_SIZE+length)
-      if( channel.read(record.toByteBuffer, offset) != record.length ) {
-        throw new IOException("short record at position: "+record_position+" in file: "+file+",
offset: "+offset)
+    def check_read_flush(end_offset: Long) = {}
+
+    def read(record_position: Long, length: Int) = {
+      val offset = record_position - position
+      assert(offset >= 0)
+
+      check_read_flush(offset + LOG_HEADER_SIZE + length)
+
+      val record = new Buffer(LOG_HEADER_SIZE + length)
+      if (channel.read(record.toByteBuffer, offset) != record.length) {
+        throw new IOException("short record at position: " + record_position + " in file:
" + file + ", offset: " + offset)
       }
 
-      if(verify_checksums) {
+      if (verify_checksums) {
 
         def record_is_not_changing = {
-          using(open) { fd =>
-            val channel = fd.getChannel
-            val new_record = new Buffer(LOG_HEADER_SIZE+length)
-            channel.read(new_record.toByteBuffer, offset)
-            var same = record == new_record
-            println(same)
-            same
+          using(open) {
+            fd =>
+              val channel = fd.getChannel
+              val new_record = new Buffer(LOG_HEADER_SIZE + length)
+              channel.read(new_record.toByteBuffer, offset)
+              var same = record == new_record
+              println(same)
+              same
           }
         }
 
 
         val is = new DataByteArrayInputStream(record)
         val prefix = is.readByte()
-        if( prefix != LOG_HEADER_PREFIX ) {
+        if (prefix != LOG_HEADER_PREFIX) {
           assert(record_is_not_changing)
-          throw new IOException("invalid record at position: "+record_position+" in file:
"+file+", offset: "+offset)
+          throw new IOException("invalid record at position: " + record_position + " in file:
" + file + ", offset: " + offset)
         }
 
         val kind = is.readByte()
@@ -259,10 +264,10 @@ case class RecordLog(directory: File, lo
         val data = is.readBuffer(length)
 
         // If your reading the whole record we can verify the data checksum
-        if( expectedLength == length ) {
-          if( expectedChecksum != checksum(data) ) {
+        if (expectedLength == length) {
+          if (expectedChecksum != checksum(data)) {
             assert(record_is_not_changing)
-            throw new IOException("checksum does not match at position: "+record_position+"
in file: "+file+", offset: "+offset)
+            throw new IOException("checksum does not match at position: " + record_position
+ " in file: " + file + ", offset: " + offset)
           }
         }
 
@@ -274,13 +279,13 @@ case class RecordLog(directory: File, lo
       }
     }
 
-    def read(record_position:Long) = {
-      val offset = record_position-position
+    def read(record_position: Long) = {
+      val offset = record_position - position
       val header = new Buffer(LOG_HEADER_SIZE)
       channel.read(header.toByteBuffer, offset)
       val is = header.bigEndianEditor();
       val prefix = is.readByte()
-      if( prefix != LOG_HEADER_PREFIX ) {
+      if (prefix != LOG_HEADER_PREFIX) {
         // Does not look like a record.
         throw new IOException("invalid record position")
       }
@@ -289,32 +294,32 @@ case class RecordLog(directory: File, lo
       val length = is.readInt()
       val data = new Buffer(length)
 
-      if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) != length ) {
+      if (channel.read(data.toByteBuffer, offset + LOG_HEADER_SIZE) != length) {
         throw new IOException("short record")
       }
 
-      if(verify_checksums) {
-        if( expectedChecksum != checksum(data) ) {
+      if (verify_checksums) {
+        if (expectedChecksum != checksum(data)) {
           throw new IOException("checksum does not match")
         }
       }
-      (id, data, record_position+LOG_HEADER_SIZE+length)
+      (id, data, record_position + LOG_HEADER_SIZE + length)
     }
 
-    def check(record_position:Long):Option[(Long, Option[Long])] = {
-      var offset = record_position-position
+    def check(record_position: Long): Option[(Long, Option[Long])] = {
+      var offset = record_position - position
       val header = new Buffer(LOG_HEADER_SIZE)
       channel.read(header.toByteBuffer, offset)
       val is = header.bigEndianEditor();
       val prefix = is.readByte()
-      if( prefix != LOG_HEADER_PREFIX ) {
+      if (prefix != LOG_HEADER_PREFIX) {
         return None // Does not look like a record.
       }
       val kind = is.readByte()
       val expectedChecksum = is.readInt()
       val length = is.readInt()
 
-      val chunk = new Buffer(1024*4)
+      val chunk = new Buffer(1024 * 4)
       val chunkbb = chunk.toByteBuffer
       offset += LOG_HEADER_SIZE
 
@@ -323,12 +328,12 @@ case class RecordLog(directory: File, lo
       // with a bad record length
       val checksumer = new CRC32
       var remaining = length
-      while( remaining > 0 ) {
-        val chunkSize = remaining.min(1024*4);
+      while (remaining > 0) {
+        val chunkSize = remaining.min(1024 * 4);
         chunkbb.position(0)
         chunkbb.limit(chunkSize)
         channel.read(chunkbb, offset)
-        if( chunkbb.hasRemaining ) {
+        if (chunkbb.hasRemaining) {
           return None
         }
         checksumer.update(chunk.data, 0, chunkSize)
@@ -336,27 +341,28 @@ case class RecordLog(directory: File, lo
         remaining -= chunkSize
       }
 
-      val checksum = ( checksumer.getValue & 0xFFFFFFFF).toInt
-      if( expectedChecksum !=  checksum ) {
+      val checksum = (checksumer.getValue & 0xFFFFFFFF).toInt
+      if (expectedChecksum != checksum) {
         return None
       }
-      val uow_start_pos = if(kind == UOW_END_RECORD && length==8) Some(decode_long(chunk))
else None
-      return Some(record_position+LOG_HEADER_SIZE+length, uow_start_pos)
+      val uow_start_pos = if (kind == UOW_END_RECORD && length == 8) Some(decode_long(chunk))
else None
+      return Some(record_position + LOG_HEADER_SIZE + length, uow_start_pos)
     }
 
-    def verifyAndGetEndPosition:Long = {
+    def verifyAndGetEndPosition: Long = {
       var pos = position;
       var current_uow_start = pos
-      val limit = position+channel.size()
-      while(pos < limit) {
+      val limit = position + channel.size()
+      while (pos < limit) {
         check(pos) match {
           case Some((next, uow_start_pos)) =>
-            uow_start_pos.foreach { uow_start_pos => 
-              if( uow_start_pos == current_uow_start ) {
-                current_uow_start = next
-              } else {
-                return current_uow_start
-              }
+            uow_start_pos.foreach {
+              uow_start_pos =>
+                if (uow_start_pos == current_uow_start) {
+                  current_uow_start = next
+                } else {
+                  return current_uow_start
+                }
             }
             pos = next
           case None =>
@@ -373,8 +379,8 @@ case class RecordLog(directory: File, lo
 
   def create_appender(position: Long): Any = {
     log_mutex.synchronized {
-      if(current_appender!=null) {
-        log_infos.put (position, new LogInfo(current_appender.file, current_appender.position,
current_appender.append_offset))
+      if (current_appender != null) {
+        log_infos.put(position, new LogInfo(current_appender.file, current_appender.position,
current_appender.append_offset))
       }
       current_appender = create_log_appender(position)
       log_infos.put(position, current_appender.info)
@@ -384,11 +390,12 @@ case class RecordLog(directory: File, lo
   def open = {
     log_mutex.synchronized {
       log_infos.clear()
-      LevelDBClient.find_sequence_files(directory, logSuffix).foreach { case (position,file)
=>
-        log_infos.put(position, LogInfo(file, position, file.length()))
+      LevelDBClient.find_sequence_files(directory, logSuffix).foreach {
+        case (position, file) =>
+          log_infos.put(position, LogInfo(file, position, file.length()))
       }
 
-      val appendPos = if( log_infos.isEmpty ) {
+      val appendPos = if (log_infos.isEmpty) {
         0L
       } else {
         val file = log_infos.lastEntry().getValue
@@ -397,9 +404,9 @@ case class RecordLog(directory: File, lo
           val actualLength = r.verifyAndGetEndPosition
           val updated = file.copy(length = actualLength - file.position)
           log_infos.put(updated.position, updated)
-          if( updated.file.length != file.length ) {
+          if (updated.file.length != file.length) {
             // we need to truncate.
-            using(new RandomAccessFile(file.file, "rw")) ( _.setLength(updated.length))
+            using(new RandomAccessFile(file.file, "rw"))(_.setLength(updated.length))
           }
           actualLength
         } finally {
@@ -418,23 +425,24 @@ case class RecordLog(directory: File, lo
   }
 
   def appender_limit = current_appender.append_position
+
   def appender_start = current_appender.position
 
-  def next_log(position:Long) = LevelDBClient.create_sequence_file(directory, position, logSuffix)
+  def next_log(position: Long) = LevelDBClient.create_sequence_file(directory, position,
logSuffix)
 
-  def appender[T](func: (LogAppender)=>T):T= {
+  def appender[T](func: (LogAppender) => T): T = {
     val intial_position = current_appender.append_position
     try {
       val rc = func(current_appender)
-      if( current_appender.append_position != intial_position ) {
+      if (current_appender.append_position != intial_position) {
         // Record a UOW_END_RECORD so that on recovery we only replay full units of work.
-        current_appender.append(UOW_END_RECORD,encode_long(intial_position))
+        current_appender.append(UOW_END_RECORD, encode_long(intial_position))
       }
       rc
     } finally {
       current_appender.flush
       log_mutex.synchronized {
-        if ( current_appender.append_offset >= logSize ) {
+        if (current_appender.append_offset >= logSize) {
           current_appender.release()
           on_log_rotate()
           create_appender(current_appender.append_position)
@@ -443,7 +451,7 @@ case class RecordLog(directory: File, lo
     }
   }
 
-  var on_log_rotate: ()=>Unit = ()=>{}
+  var on_log_rotate: () => Unit = () => {}
 
   private val reader_cache = new LRUCache[File, LogReader](100) {
     protected override def onCacheEviction(entry: Entry[File, LogReader]) = {
@@ -451,51 +459,56 @@ case class RecordLog(directory: File, lo
     }
   }
 
-  def log_info(pos:Long) = log_mutex.synchronized { Option(log_infos.floorEntry(pos)).map(_.getValue)
}
+  def log_info(pos: Long) = log_mutex.synchronized {
+    Option(log_infos.floorEntry(pos)).map(_.getValue)
+  }
 
-  private def get_reader[T](record_position:Long)(func: (LogReader)=>T) = {
+  private def get_reader[T](record_position: Long)(func: (LogReader) => T) = {
 
     val lookup = log_mutex.synchronized {
       val info = log_info(record_position)
-      info.map { info=>
-        if(info.position == current_appender.position) {
-          current_appender.retain()
-          (info, current_appender)
-        } else {
-          (info, null)
-        }
+      info.map {
+        info =>
+          if (info.position == current_appender.position) {
+            current_appender.retain()
+            (info, current_appender)
+          } else {
+            (info, null)
+          }
       }
     }
 
-    lookup.map { case (info, appender) =>
-      val reader = if( appender!=null ) {
-        // read from the current appender.
-        appender
-      } else {
-        // Checkout a reader from the cache...
-        reader_cache.synchronized {
-          var reader = reader_cache.get(info.file)
-          if(reader==null) {
-            reader = LogReader(info.file, info.position)
-            reader_cache.put(info.file, reader)
+    lookup.map {
+      case (info, appender) =>
+        val reader = if (appender != null) {
+          // read from the current appender.
+          appender
+        } else {
+          // Checkout a reader from the cache...
+          reader_cache.synchronized {
+            var reader = reader_cache.get(info.file)
+            if (reader == null) {
+              reader = LogReader(info.file, info.position)
+              reader_cache.put(info.file, reader)
+            }
+            reader.retain()
+            reader
           }
-          reader.retain()
-          reader
         }
-      }
 
-      try {
-        func(reader)
-      } finally {
-        reader.release
-      }
+        try {
+          func(reader)
+        } finally {
+          reader.release
+        }
     }
   }
 
-  def read(pos:Long) = {
+  def read(pos: Long) = {
     get_reader(pos)(_.read(pos))
   }
-  def read(pos:Long, length:Int) = {
+
+  def read(pos: Long, length: Int) = {
     get_reader(pos)(_.read(pos, length))
   }
 

Copied: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala
(from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreTest.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala?p2=activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala&p1=activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreTest.scala&r1=1245945&r2=1291054&rev=1291054&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala
Sun Feb 19 20:24:15 2012
@@ -1,3 +1,5 @@
+package org.apache.activemq.apollo.broker.store.leveldb
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -14,7 +16,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker.store.leveldb
 
 import dto.LevelDBStoreDTO
 import org.apache.activemq.apollo.broker.store.{Store, StoreFunSuiteSupport}
@@ -24,7 +25,7 @@ import org.apache.activemq.apollo.broker
  */
 class LevelDBStoreTest extends StoreFunSuiteSupport {
 
-  def create_store(flushDelay:Long):Store = {
+  def create_store(flushDelay: Long): Store = {
     new LevelDBStore({
       val rc = new LevelDBStoreDTO
       rc.directory = data_directory

Copied: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/PureJavaLevelDBStoreTest.scala
(from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/PureJavaLevelDBStoreTest.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/PureJavaLevelDBStoreTest.scala?p2=activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/PureJavaLevelDBStoreTest.scala&p1=activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/PureJavaLevelDBStoreTest.scala&r1=1245945&r2=1291054&rev=1291054&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/PureJavaLevelDBStoreTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/PureJavaLevelDBStoreTest.scala
Sun Feb 19 20:24:15 2012
@@ -1,4 +1,4 @@
-package org.apache.activemq.apollo.broker.store.leveldb.leveldb
+package org.apache.activemq.apollo.broker.store.leveldb
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -16,6 +16,7 @@ package org.apache.activemq.apollo.broke
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 import org.apache.activemq.apollo.broker.store.{Store, StoreFunSuiteSupport}
 import org.apache.activemq.apollo.broker.store.leveldb.LevelDBStore
 import org.apache.activemq.apollo.broker.store.leveldb.dto.LevelDBStoreDTO
@@ -26,7 +27,7 @@ import org.apache.activemq.apollo.util.F
  */
 class PureJavaLevelDBStoreTest extends StoreFunSuiteSupport {
 
-  def create_store(flushDelay:Long):Store = {
+  def create_store(flushDelay: Long): Store = {
     new LevelDBStore({
       val rc = new LevelDBStoreDTO
       rc.index_factory = "org.iq80.leveldb.impl.Iq80DBFactory"



Mime
View raw message