activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1205020 [2/2] - in /activemq/activemq-apollo/trunk: ./ apollo-leveldb/ apollo-leveldb/src/ apollo-leveldb/src/main/ apollo-leveldb/src/main/resources/ apollo-leveldb/src/main/resources/META-INF/ apollo-leveldb/src/main/resources/META-INF/s...
Date Tue, 22 Nov 2011 14:40:21 GMT
Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
Tue Nov 22 14:40:18 2011
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.leveldb
+
+import dto.{LevelDBStoreDTO, LevelDBStoreStatusDTO}
+import collection.Seq
+import org.fusesource.hawtdispatch._
+import java.util.concurrent._
+import atomic.{AtomicReference, AtomicLong}
+import org.apache.activemq.apollo.broker.store._
+import org.apache.activemq.apollo.util._
+import org.fusesource.hawtdispatch.ListEventAggregator
+import org.apache.activemq.apollo.dto.StoreStatusDTO
+import org.apache.activemq.apollo.util.OptionSupport._
+import scala.util.continuations._
+import java.io._
+import org.apache.activemq.apollo.web.resources.ViewHelper
+import collection.mutable.ListBuffer
+import org.fusesource.hawtbuf.Buffer
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object LevelDBStore extends Log {
+  val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
+}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class LevelDBStore(val config:LevelDBStoreDTO) extends DelayingStoreSupport {
+
+  var next_queue_key = new AtomicLong(1)
+  var next_msg_key = new AtomicLong(1)
+
+  var write_executor:ExecutorService = _
+  var gc_executor:ExecutorService = _
+  var read_executor:ExecutorService = _
+
+  var client:LevelDBClient = _
+  def create_client = new LevelDBClient(this)
+
+
+  def store_kind = "leveldb"
+
+  override def toString = store_kind+" store at "+config.directory
+
+  def flush_delay = config.flush_delay.getOrElse(100)
+  
+  protected def get_next_msg_key = next_msg_key.getAndIncrement
+
+  protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
+    write_executor {
+      client.store(uows, ^{
+        dispatch_queue {
+          callback
+        }
+      })
+    }
+  }
+
+  protected def _start(on_completed: Runnable) = {
+    try {
+      client = create_client
+      write_executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
+        def newThread(r: Runnable) = {
+          val rc = new Thread(r, store_kind + " store io write")
+          rc.setDaemon(true)
+          rc
+        }
+      })
+      gc_executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
+        def newThread(r: Runnable) = {
+          val rc = new Thread(r, store_kind + " store gc")
+          rc.setDaemon(true)
+          rc
+        }
+      })
+      read_executor = Executors.newFixedThreadPool(config.read_threads.getOrElse(10), new
ThreadFactory() {
+        def newThread(r: Runnable) = {
+          val rc = new Thread(r, store_kind + " store io read")
+          rc.setDaemon(true)
+          rc
+        }
+      })
+      poll_stats
+      write_executor {
+        try {
+          client.start()
+          next_msg_key.set(client.getLastMessageKey + 1)
+          next_queue_key.set(client.getLastQueueKey + 1)
+          poll_gc
+          on_completed.run
+        } catch {
+          case e:Throwable =>
+            e.printStackTrace()
+            LevelDBStore.error(e, "Store client startup failure: "+e)
+        }
+      }
+    }
+    catch {
+      case e:Throwable =>
+        e.printStackTrace()
+        LevelDBStore.error(e, "Store startup failure: "+e)
+    }
+  }
+
+  protected def _stop(on_completed: Runnable) = {
+    new Thread() {
+      override def run = {
+        write_executor.shutdown
+        write_executor.awaitTermination(60, TimeUnit.SECONDS)
+        write_executor = null
+        read_executor.shutdown
+        read_executor.awaitTermination(60, TimeUnit.SECONDS)
+        read_executor = null
+        gc_executor.shutdown
+        client.stop
+        on_completed.run
+      }
+    }.start
+  }
+
+  private def keep_polling = {
+    val ss = service_state
+    ss.is_starting || ss.is_started
+  }
+
+  def poll_gc:Unit = {
+    val interval = config.gc_interval.getOrElse(60*30)
+    if( interval>0 ) {
+      dispatch_queue.after(interval, TimeUnit.SECONDS) {
+        if( keep_polling ) {
+          gc {
+            poll_gc
+          }
+        }
+      }
+    }
+  }
+
+  def gc(onComplete: =>Unit) = gc_executor {
+    client.gc
+    onComplete
+  }
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation of the Store interface
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  /**
+   * Deletes all stored data from the store.
+   */
+  def purge(callback: =>Unit) = {
+    write_executor {
+      client.purge()
+      next_queue_key.set(1)
+      next_msg_key.set(1)
+      callback
+    }
+  }
+
+
+  def get(key: Buffer)(callback: (Option[Buffer]) => Unit) = {
+    read_executor {
+      callback(client.get(key))
+    }
+  }
+
+  /**
+   * Ges the last queue key identifier stored.
+   */
+  def get_last_queue_key(callback:(Option[Long])=>Unit):Unit = {
+    write_executor {
+      callback(Some(client.getLastQueueKey))
+    }
+  }
+
+  def add_queue(record: QueueRecord)(callback: (Boolean) => Unit) = {
+    write_executor {
+     client.addQueue(record, ^{ callback(true) })
+    }
+  }
+
+  def remove_queue(queueKey: Long)(callback: (Boolean) => Unit) = {
+    write_executor {
+      client.removeQueue(queueKey,^{ callback(true) })
+    }
+  }
+
+  def get_queue(queueKey: Long)(callback: (Option[QueueRecord]) => Unit) = {
+    write_executor {
+      callback( client.getQueue(queueKey) )
+    }
+  }
+
+  def list_queues(callback: (Seq[Long]) => Unit) = {
+    write_executor {
+      callback( client.listQueues )
+    }
+  }
+
+  val load_source = createSource(new ListEventAggregator[(Long, AtomicReference[Array[Byte]],
(Option[MessageRecord])=>Unit)](), dispatch_queue)
+  load_source.setEventHandler(^{drain_loads});
+  load_source.resume
+
+
+  def load_message(messageKey: Long, locator:AtomicReference[Array[Byte]])(callback: (Option[MessageRecord])
=> Unit) = {
+    message_load_latency_counter.start { end=>
+      load_source.merge((messageKey, locator, { (result)=>
+        end()
+        callback(result)
+      }))
+    }
+  }
+
+  def drain_loads = {
+    var data = load_source.getData
+    message_load_batch_size_counter += data.size
+    read_executor ^{
+      client.loadMessages(data)
+    }
+  }
+
+  def list_queue_entry_ranges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange])
=> Unit) = {
+    write_executor ^{
+      callback( client.listQueueEntryGroups(queueKey, limit) )
+    }
+  }
+
+  def list_queue_entries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord])
=> Unit) = {
+    write_executor ^{
+      callback( client.getQueueEntries(queueKey, firstSeq, lastSeq) )
+    }
+  }
+
+  def poll_stats:Unit = {
+    def displayStats = {
+      if( service_state.is_started ) {
+
+        flush_latency = flush_latency_counter(true)
+        message_load_latency = message_load_latency_counter(true)
+//        client.metric_journal_append = client.metric_journal_append_counter(true)
+//        client.metric_index_update = client.metric_index_update_counter(true)
+        commit_latency = commit_latency_counter(true)
+        message_load_batch_size =  message_load_batch_size_counter(true)
+
+        poll_stats
+      }
+    }
+
+    dispatch_queue.executeAfter(1, TimeUnit.SECONDS, ^{ displayStats })
+  }
+
+  def get_store_status(callback:(StoreStatusDTO)=>Unit) = dispatch_queue {
+    val rc = new LevelDBStoreStatusDTO
+    fill_store_status(rc)
+    rc.message_load_batch_size = message_load_batch_size
+    write_executor {
+      client.using_index {
+        rc.index_stats = client.index.getProperty("leveldb.stats")
+        rc.log_append_pos = client.log.appender_limit
+        rc.index_snapshot_pos = client.last_index_snapshot_pos
+        rc.last_gc_duration = client.last_gc_duration
+        rc.last_gc_ts = client.last_gc_ts
+        rc.in_gc = client.in_gc
+        rc.log_stats = {
+          var row_layout = "%-20s | %-10s | %10s/%-10s\n"
+          row_layout.format("File", "Messages", "Used Size", "Total Size")+
+          client.log.log_infos.map(x=> x._1 -> client.gc_detected_log_usage.get(x._1)).toSeq.flatMap
{ x=>
+            try {
+              val file = LevelDBClient.create_sequence_file(client.directory, x._1, LevelDBClient.LOG_SUFFIX)
+              val size = file.length()
+              val usage = x._2 match {
+                case Some(usage)=>
+                  (usage.count.toString, ViewHelper.memory(usage.size))
+                case None=>
+                  ("unknown", "unknown")
+              }
+              Some(row_layout.format(file.getName, usage._1, usage._2, ViewHelper.memory(size)))
+            } catch {
+              case e:Throwable =>
+                None
+            }
+          }.mkString("")
+        }
+      }
+      callback(rc)
+    }
+  }
+
+  /**
+   * Exports the contents of the store to the provided streams.  Each stream should contain
+   * a list of framed protobuf objects with the corresponding object types.
+   */
+  def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable =
write_executor ! {
+    client.export_pb(streams)
+  }
+
+  /**
+   * Imports a previously exported set of streams.  This deletes any previous data
+   * in the store.
+   */
+  def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable = write_executor
! {
+    client.import_pb(streams)
+  }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreFactory.scala?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreFactory.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreFactory.scala
Tue Nov 22 14:40:18 2011
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.leveldb
+
+import dto.LevelDBStoreDTO
+import org.apache.activemq.apollo.broker.store.StoreFactory
+import org.apache.activemq.apollo.dto.StoreDTO
+import org.apache.activemq.apollo.util._
+
+/**
+ * <p>
+ * Hook to use a HawtDBStore when a HawtDBStoreDTO is
+ * used in a broker configuration.
+ * </p>
+ * <p>
+ * This class is discovered using the following resource file:
+ * <code>META-INF/services/org.apache.activemq.apollo/stores</code>
+ * </p>
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class LevelDBStoreFactory extends StoreFactory {
+  def create(config: StoreDTO) =  config match {
+    case config:LevelDBStoreDTO =>
+      if( config.getClass == classOf[LevelDBStoreDTO] ) {
+        new LevelDBStore(config)
+      } else {
+        null
+      }
+    case _ => null
+  }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Tue Nov 22 14:40:18 2011
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.leveldb
+
+import java.{lang=>jl}
+import java.{util=>ju}
+
+import org.apache.activemq.apollo.util._
+import org.fusesource.hawtbuf.{DataByteArrayOutputStream, AbstractVarIntSupport}
+import java.io._
+import java.util.zip.CRC32
+import java.util.Map.Entry
+import java.util.Arrays
+import collection.mutable.{HashMap, HashSet}
+import collection.immutable.TreeMap
+import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.TimeUnit
+import org.fusesource.hawtdispatch.BaseRetained
+import java.nio.ByteBuffer
+
+object RecordLog {
+
+  // The log files contain a sequence of variable length log records:
+  // record :=
+  //   '*L'     : int8*2     // 2 byte constant
+  //   checksum : uint32     // crc32c of the data[]
+  //   length   : uint32     // the length the the data
+  //   data     : int8*length
+  //
+  // The log records are used to aggregate multiple data records
+  // as a single write to the file system.
+
+  //
+  // The data is composed of multiple records too:
+  // data :=
+  //   kind     : int8
+  //   length   : varInt
+  //   body     : int8*length
+  //
+  // The kind field is an aid to the app layer.  It cannot be set to
+  // '*'.
+
+  val LOG_HEADER_PREFIX = Array('*', 'L').map(_.toByte)
+  val LOG_HEADER_SIZE = 10 // BATCH_HEADER_PREFIX (2) + checksum (4) + length (4)
+
+}
+
+case class RecordLog(directory: File, log_suffix:String) {
+  import FileSupport._
+  import RecordLog._
+
+  directory.mkdirs()
+
+  var write_buffer_size = 1024 * 1024 * 4
+  var log_size = 1024 * 1024 * 100
+  private var current_appender:LogAppender = _
+
+  case class LogInfo(file:File, position:Long, length:AtomicLong) {
+    def limit = position+length.get
+  }
+
+  var log_infos = TreeMap[Long, LogInfo]()
+  object log_mutex
+
+  def delete(id:Long) = {
+    log_mutex.synchronized {
+      // We can't delete the current appender.
+      if( current_appender.start != id ) {
+        log_infos.get(id).foreach { info =>
+          on_delete(info.file)
+          log_infos = log_infos.filterNot(_._1 == id)
+        }
+      }
+    }
+  }
+
+  protected def on_delete(file:File) = {
+    file.delete()
+  }
+
+  class LogAppender(val file:File, val start:Long) {
+
+    val fos = new FileOutputStream(file)
+    def channel = fos.getChannel
+    def os:OutputStream = fos
+
+    val outbound = new DataByteArrayOutputStream()
+
+    var batch_length = 0
+    val length = new AtomicLong(0)
+    var limit = start
+
+    // set the file size ahead of time so that we don't have to sync the file
+    // meta-data on every log sync.
+    channel.position(log_size)
+    channel.write(ByteBuffer.wrap(Array(0.toByte)))
+    channel.force(true)
+    channel.position(0)
+
+    def sync = {
+      // only need to update the file metadata if the file size changes..
+      channel.force(length.get() > log_size)
+    }
+
+    def flush {
+      if( batch_length!= 0 ) {
+
+        // Update the buffer with the log header info now that we
+        // can calc the length and checksum info
+        val buffer = outbound.toBuffer
+
+        assert(buffer.length()==LOG_HEADER_SIZE+batch_length)
+
+        outbound.reset()
+        outbound.write(LOG_HEADER_PREFIX)
+
+        val checksum = new CRC32
+        checksum.update(buffer.data, buffer.offset + LOG_HEADER_SIZE, buffer.length - LOG_HEADER_SIZE)
+        var actual_checksum = (checksum.getValue & 0xFFFFFFFF).toInt
+
+        outbound.writeInt( actual_checksum )
+        outbound.writeInt(batch_length)
+
+        // Actually write the record to the file..
+        buffer.writeTo(os);
+
+        length.addAndGet( buffer.length() )
+
+        batch_length = 0
+        outbound.reset()
+      }
+    }
+
+    /**
+     * returns the offset position of the data record.
+     */
+    def append(id:Byte, data: Array[Byte]): Long = {
+      assert(id != LOG_HEADER_PREFIX(0))
+      if( batch_length!=0 && (batch_length + data.length > write_buffer_size)
) {
+        flush
+      }
+      if( batch_length==0 ) {
+        // first data pos record is offset by the log header.
+        outbound.skip(LOG_HEADER_SIZE);
+        limit += LOG_HEADER_SIZE
+      }
+      val rc = limit;
+
+      val start = outbound.position
+      outbound.writeByte(id);
+      outbound.writeVarInt(data.length)
+      outbound.write(data);
+      val count = outbound.position - start
+
+      limit += count
+      batch_length += count
+      rc
+    }
+
+    def close = {
+      flush
+      channel.truncate(length.get())
+      os.close()
+    }
+  }
+
+  case class LogReader(file:File, start:Long) {
+
+    val is = new RandomAccessFile(file, "r")
+
+    val var_support = new AbstractVarIntSupport {
+      def writeByte(p1: Int) = sys.error("Not supported")
+      def readByte(): Byte = is.readByte()
+    };
+
+    def read(pos:Long) = this.synchronized {
+      is.seek(pos-start)
+      val id = is.read()
+      if( id == LOG_HEADER_PREFIX(0) ) {
+        (id, null, pos+LOG_HEADER_SIZE)
+      } else {
+        val length = var_support.readVarInt()
+        val data = new Array[Byte](length)
+        is.readFully(data)
+        (id, data, is.getFilePointer)
+      }
+    }
+
+    def close = this.synchronized {
+      is.close()
+    }
+
+    def next_position(verify_checksums:Boolean=true):Long = this.synchronized {
+      var offset = 0;
+      val prefix = new Array[Byte](LOG_HEADER_PREFIX.length)
+      var done = false
+      while(!done) {
+        try {
+          is.seek(offset)
+          is.readFully(prefix)
+          if( !Arrays.equals(prefix, LOG_HEADER_PREFIX) ) {
+            throw new IOException("Missing header prefix");
+          }
+          val expected_checksum = is.readInt();
+
+          val length = is.readInt();
+          if (verify_checksums) {
+            val data = new Array[Byte](length)
+            is.readFully(data)
+
+            val checksum = new CRC32
+            checksum.update(data)
+            val actual_checksum = (checksum.getValue & 0xFFFFFFFF).toInt
+
+            if( expected_checksum != actual_checksum ) {
+              throw new IOException("Data checksum missmatch");
+            }
+          }
+          offset += LOG_HEADER_SIZE + length
+
+        } catch {
+          case e:IOException =>
+            done = true
+        }
+      }
+      start + offset
+    }
+  }
+
+  def create_log_appender(position: Long) = {
+    new LogAppender(next_log(position), position)
+  }
+
+  def create_appender(position: Long): Any = {
+    current_appender = create_log_appender(position)
+    log_mutex.synchronized {
+      log_infos += position -> new LogInfo(current_appender.file, position, current_appender.length)
+    }
+  }
+
+  def open = {
+    log_mutex.synchronized {
+      log_infos = LevelDBClient.find_sequence_files(directory, log_suffix).map { case (position,file)
=>
+        position -> LogInfo(file, position, new AtomicLong(file.length()))
+      }
+
+      val append_pos = if( log_infos.isEmpty ) {
+        0L
+      } else {
+        val (_, file) = log_infos.last
+        val r = LogReader(file.file, file.position)
+        try {
+          val rc = r.next_position()
+          file.length.set(rc - file.position)
+          if( file.file.length != file.length.get() ) {
+            // we need to truncate.
+            using(new RandomAccessFile(file.file, "rw")) ( _.setLength(file.length.get())
)
+          }
+          rc
+        } finally {
+          r.close
+        }
+      }
+
+      create_appender(append_pos)
+    }
+  }
+  def close = {
+    log_mutex.synchronized {
+      current_appender.close
+    }
+  }
+
+  def appender_limit = current_appender.limit
+  def appender_start = current_appender.start
+
+  def next_log(position:Long) = LevelDBClient.create_sequence_file(directory, position, log_suffix)
+
+  def appender[T](func: (LogAppender)=>T):T= {
+    try {
+      func(current_appender)
+    } finally {
+      current_appender.flush
+      log_mutex.synchronized {
+        if ( current_appender.length.get >= log_size ) {
+          current_appender.close
+          on_log_rotate()
+          create_appender(current_appender.limit)
+        }
+      }
+    }
+  }
+
+  var on_log_rotate: ()=>Unit = ()=>{}
+
+  val next_reader_id = new LongCounter()
+  val reader_cache_files = new HashMap[File, HashSet[Long]];
+  val reader_cache_readers = new LRUCache[Long, LogReader](100) {
+    protected override def onCacheEviction(entry: Entry[Long, LogReader]) = {
+      var key = entry.getKey
+      var value = entry.getValue
+      value.close
+
+      val set = reader_cache_files.get(value.file).get
+      set.remove(key)
+      if( set.isEmpty ) {
+        reader_cache_files.remove(value.file)
+      }
+    }
+  }
+
+
+  private def get_reader[T](pos:Long)(func: (LogReader)=>T) = {
+    val infos = log_mutex.synchronized(log_infos)
+    val info = infos.range(0L, pos+1).lastOption.map(_._2)
+    info.map { info =>
+      // Checkout a reader from the cache...
+      val (set, reader_id, reader) = reader_cache_files.synchronized {
+        var set = reader_cache_files.getOrElseUpdate(info.file, new HashSet);
+        if( set.isEmpty ) {
+          val reader_id = next_reader_id.getAndIncrement()
+          val reader = new LogReader(info.file, info.position)
+          set.add(reader_id)
+          reader_cache_readers.put(reader_id, reader)
+          (set, reader_id, reader)
+        } else {
+          val reader_id = set.head
+          set.remove(reader_id)
+          (set, reader_id, reader_cache_readers.get(reader_id))
+        }
+      }
+
+      try {
+        func(reader)
+      } finally {
+        // check him back in..
+        reader_cache_files.synchronized {
+          set.add(reader_id)
+        }
+      }
+    }
+  }
+
+  def read(pos:Long) = {
+    get_reader(pos)(_.read(pos))
+  }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.jade?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.jade
(added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.jade
Tue Nov 22 14:40:18 2011
@@ -0,0 +1,63 @@
+-# ---------------------------------------------------------------------------
+-# Licensed to the Apache Software Foundation (ASF) under one or more
+-# contributor license agreements.  See the NOTICE file distributed with
+-# this work for additional information regarding copyright ownership.
+-# The ASF licenses this file to You under the Apache License, Version 2.0
+-# (the "License"); you may not use this file except in compliance with
+-# the License.  You may obtain a copy of the License at
+-#
+-# http://www.apache.org/licenses/LICENSE-2.0
+-#
+-# Unless required by applicable law or agreed to in writing, software
+-# distributed under the License is distributed on an "AS IS" BASIS,
+-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-# See the License for the specific language governing permissions and
+-# limitations under the License.
+-# ---------------------------------------------------------------------------
+
+- import org.apache.activemq.apollo.dto._
+- import it._
+- val helper = new org.apache.activemq.apollo.web.resources.ViewHelper
+- import helper._
+- import java.util.concurrent.TimeUnit._
+
+.breadcumbs
+  a(href={strip_resolve(".")}) Back
+
+h1 Store: #{id}
+p state: #{state} for #{ uptime(state_since) }
+
+p pending stores: #{pending_stores}
+
+h2 Cancel Stats
+p canceled message stores: #{canceled_message_counter}
+p canceled message enqueues: #{canceled_enqueue_counter}
+
+h2 Flush Stats
+p flushed message stores: #{flushed_message_counter}
+p flushed message enqueues: #{flushed_enqueue_counter}
+
+h2 Store Latency Stats
+
+- def show(name:String, value:TimeMetricDTO)
+  p #{name} : average #{value.avg(MILLISECONDS)} ms,  min #{value.min(MILLISECONDS)} ms,
max #{value.max(MILLISECONDS)} ms, #{value.count} events
+
+- show("Message load latency", message_load_latency)
+- show("UOW flush latency", flush_latency)
+
+h2 Log Status
+p last log GC occured #{uptime(last_gc_ts)}
+p last log GC duration: #{friendly_duration(last_gc_duration)}
+pre
+  !~~ log_stats
+p
+  Index recovery starts from log position:
+  code #{"%016x".format(index_snapshot_pos)}
+p
+  Append position:
+  code #{"%016x".format(log_append_pos)}
+
+h2 Index Status
+pre
+  !~~ index_stats
+

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/log4j.properties?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/log4j.properties (added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/log4j.properties Tue
Nov 22 14:40:18 2011
@@ -0,0 +1,35 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=WARN, console, file
+log4j.logger.org.apache.activemq=TRACE
+
+# Console will only display warnnings
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
+log4j.appender.console.threshold=TRACE
+
+# File appender will contain all info messages
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
+log4j.appender.file.file=target/test.log
+log4j.appender.file.append=true

Propchange: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/log4j.properties
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/org/apache/activemq/apollo/broker/store/leveldb/dto/simple.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/org/apache/activemq/apollo/broker/store/leveldb/dto/simple.xml?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/org/apache/activemq/apollo/broker/store/leveldb/dto/simple.xml
(added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/org/apache/activemq/apollo/broker/store/leveldb/dto/simple.xml
Tue Nov 22 14:40:18 2011
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo"
+        xmlns:fabric="http://fabric.fusesource.org/apollo">
+    <virtual_host enabled="true" id="vh-local">
+        <host_name>localhost</host_name>
+        <fabric:leveldb_store directory="activemq-data"/>
+    </virtual_host>
+</broker>

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/XmlCodecTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/XmlCodecTest.java?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/XmlCodecTest.java
(added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/XmlCodecTest.java
Tue Nov 22 14:40:18 2011
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.leveldb.dto;
+
+import org.apache.activemq.apollo.dto.BrokerDTO;
+import org.apache.activemq.apollo.dto.VirtualHostDTO;
+import org.apache.activemq.apollo.dto.XmlCodec;
+import org.junit.Test;
+
+import java.io.InputStream;
+
+import static junit.framework.Assert.*;
+
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+
+public class XmlCodecTest {
+
+    private InputStream resource(String path) {
+        return getClass().getResourceAsStream(path);
+    }
+
+    @Test
+    public void unmarshalling() throws Exception {
+        BrokerDTO dto = XmlCodec.decode(BrokerDTO.class, resource("simple.xml"));
+        assertNotNull(dto);
+        VirtualHostDTO host = dto.virtual_hosts.get(0);
+        assertEquals("vh-local", host.id);
+        assertEquals("localhost", host.host_names.get(0));
+
+        assertNotNull( host.store );
+        assertTrue( host.store instanceof LevelDBStoreDTO);
+
+    }
+
+
+}

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreBenchmark.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreBenchmark.scala?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreBenchmark.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreBenchmark.scala
Tue Nov 22 14:40:18 2011
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.leveldb
+
+import dto.LevelDBStoreDTO
+import org.apache.activemq.apollo.broker.store.StoreBenchmarkSupport
+import org.apache.activemq.apollo.broker.store.Store
+import org.apache.activemq.apollo.util.FileSupport._
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class LevelDBStoreBenchmark extends StoreBenchmarkSupport {
+
+  def create_store(flushDelay:Long):Store = {
+    val rc = new LevelDBStore({
+      val rc = new LevelDBStoreDTO
+      rc.directory = basedir / "activemq-data"
+      rc
+    })
+    rc.config.flush_delay = flushDelay
+    rc
+  }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreTest.scala?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreTest.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreTest.scala
Tue Nov 22 14:40:18 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.leveldb
+
+import dto.LevelDBStoreDTO
+import org.apache.activemq.apollo.broker.store.{Store, StoreFunSuiteSupport}
+import org.apache.activemq.apollo.util.FileSupport._
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class LevelDBStoreTest extends StoreFunSuiteSupport {
+
+  def create_store(flushDelay:Long):Store = {
+    new LevelDBStore({
+      val rc = new LevelDBStoreDTO
+      rc.directory = basedir / "target" / "apollo-data"
+      rc.flush_delay = flushDelay
+      rc
+    })
+  }
+
+}

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1205020&r1=1205019&r2=1205020&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Tue Nov 22 14:40:18 2011
@@ -164,6 +164,7 @@
     <module>apollo-broker</module>
     <module>apollo-selector</module>
     <module>apollo-tcp</module>
+    <module>apollo-leveldb</module>
     <module>apollo-bdb</module>
     <module>apollo-jdbm2</module>
     <module>apollo-dto</module>



Mime
View raw message