activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1205020 [1/2] - in /activemq/activemq-apollo/trunk: ./ apollo-leveldb/ apollo-leveldb/src/ apollo-leveldb/src/main/ apollo-leveldb/src/main/resources/ apollo-leveldb/src/main/resources/META-INF/ apollo-leveldb/src/main/resources/META-INF/s...
Date Tue, 22 Nov 2011 14:40:21 GMT
Author: chirino
Date: Tue Nov 22 14:40:18 2011
New Revision: 1205020

URL: http://svn.apache.org/viewvc?rev=1205020&view=rev
Log:
Adding a leveldb based message store implementation.

Added:
    activemq/activemq-apollo/trunk/apollo-leveldb/
    activemq/activemq-apollo/trunk/apollo-leveldb/pom.xml
    activemq/activemq-apollo/trunk/apollo-leveldb/src/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/META-INF/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/META-INF/services/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/META-INF/services/org.apache.activemq.apollo/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/META-INF/services/org.apache.activemq.apollo/jaxb-module.index
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/META-INF/services/org.apache.activemq.apollo/store-factory.index
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/org/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/org/apache/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/org/apache/activemq/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/org/apache/activemq/apollo/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/org/apache/activemq/apollo/broker/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/org/apache/activemq/apollo/broker/store/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/org/apache/activemq/apollo/broker/store/leveldb/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/org/apache/activemq/apollo/broker/store/leveldb/dto/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/org/apache/activemq/apollo/broker/store/leveldb/dto/jaxb.index
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/HALevelDBStoreDTO.java
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/package-info.java
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/ExtensionJaxbModule.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/Interval.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreFactory.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/leveldb/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/leveldb/dto/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.jade
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/log4j.properties   (with props)
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/org/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/org/apache/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/org/apache/activemq/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/org/apache/activemq/apollo/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/org/apache/activemq/apollo/broker/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/org/apache/activemq/apollo/broker/store/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/org/apache/activemq/apollo/broker/store/leveldb/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/org/apache/activemq/apollo/broker/store/leveldb/dto/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/resources/org/apache/activemq/apollo/broker/store/leveldb/dto/simple.xml
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/XmlCodecTest.java
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreBenchmark.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreTest.scala
Modified:
    activemq/activemq-apollo/trunk/pom.xml

Added: activemq/activemq-apollo/trunk/apollo-leveldb/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/pom.xml?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/pom.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/pom.xml Tue Nov 22 14:40:18 2011
@@ -0,0 +1,175 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.activemq</groupId>
+    <artifactId>apollo-scala</artifactId>
+    <version>1.0-SNAPSHOT</version>
+    <relativePath>../apollo-scala</relativePath>
+  </parent>
+
+  <groupId>org.fusesource.fabric.apollo</groupId>
+  <artifactId>apollo-leveldb</artifactId>
+  <version>1.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+
+  <name>${project.artifactId}</name>
+  <description>LevelDB based message storage for Apollo</description>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-broker</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-all</artifactId>
+      <version>1.1-SNAPSHOT</version>
+    </dependency>
+
+    <!-- Since we implement a jade template to display the LevelDB status -->
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-web</artifactId>
+      <version>1.0-SNAPSHOT</version>
+      <type>jar</type>
+    </dependency>
+    <dependency>
+      <groupId>javax.servlet</groupId>
+      <artifactId>servlet-api</artifactId>
+      <version>${servlet-api-version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- Scala Support -->
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <scope>compile</scope>
+      <version>${scala-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-compiler</artifactId>
+      <version>${scala-version}</version>
+      <scope>compile</scope>
+      <optional>true</optional>
+    </dependency>
+    
+    <!-- Testing Dependencies -->    
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_2.9.0</artifactId>
+      <version>${scalatest-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-broker</artifactId>
+      <version>1.0-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-util</artifactId>
+      <version>1.0-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+      <version>${junit-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>${slf4j-version}</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      
+      <!-- include all the dependencies into the jar so it can run standalone -->
+      <plugin>
+        <groupId>org.fusesource.mvnplugins</groupId>
+        <artifactId>maven-uberize-plugin</artifactId>
+        <version>1.15</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals><goal>uberize</goal></goals>
+            <configuration>
+              <artifactSet>
+                <includes>
+                  <include>org.fusesource.leveldbjni:leveldbjni-all</include>
+                </includes>
+              </artifactSet>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      
+      <!-- <plugin>
+        <groupId>org.fusesource.mvnplugins</groupId>
+        <artifactId>maven-fab-plugin</artifactId>
+        <version>${maven-fab-plugin-version}</version>
+        <configuration>
+          <descriptor>
+            <Name>leveldb</Name>
+            <Long-Description></Long-Description>
+            <Extends>${project.groupId}:apollo-cli:${project.version}</Extends>
+          </descriptor>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>generate</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin> -->
+
+      <plugin>
+        <groupId>org.fusesource.scalate</groupId>
+        <artifactId>maven-scalate-plugin</artifactId>
+        <version>${scalate-version}</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>precompile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      
+    </plugins>
+  </build>
+
+</project>

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/META-INF/services/org.apache.activemq.apollo/jaxb-module.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/META-INF/services/org.apache.activemq.apollo/jaxb-module.index?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/META-INF/services/org.apache.activemq.apollo/jaxb-module.index (added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/META-INF/services/org.apache.activemq.apollo/jaxb-module.index Tue Nov 22 14:40:18 2011
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.broker.store.leveldb.ExtensionJaxbModule
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/META-INF/services/org.apache.activemq.apollo/store-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/META-INF/services/org.apache.activemq.apollo/store-factory.index?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/META-INF/services/org.apache.activemq.apollo/store-factory.index (added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/META-INF/services/org.apache.activemq.apollo/store-factory.index Tue Nov 22 14:40:18 2011
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.broker.store.leveldb.LevelDBStoreFactory

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/org/apache/activemq/apollo/broker/store/leveldb/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/org/apache/activemq/apollo/broker/store/leveldb/dto/jaxb.index?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/org/apache/activemq/apollo/broker/store/leveldb/dto/jaxb.index (added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/resources/org/apache/activemq/apollo/broker/store/leveldb/dto/jaxb.index Tue Nov 22 14:40:18 2011
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+LevelDBStoreDTO
+LevelDBStoreStatusDTO

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/HALevelDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/HALevelDBStoreDTO.java?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/HALevelDBStoreDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/HALevelDBStoreDTO.java Tue Nov 22 14:40:18 2011
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.leveldb.dto;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="haleveldb_store")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class HALevelDBStoreDTO extends LevelDBStoreDTO {
+
+    @XmlAttribute(name = "dfs_url")
+    public String dfs_url;
+
+    @XmlAttribute(name = "dfs_config")
+    public String dfs_config;
+
+    @XmlAttribute(name = "dfs_directory")
+    public String dfs_directory;
+
+    @XmlAttribute(name = "dfs_block_size")
+    public Integer dfs_block_size;
+
+    @XmlAttribute(name = "dfs_replication")
+    public Integer dfs_replication;
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (!super.equals(o)) return false;
+
+        HALevelDBStoreDTO that = (HALevelDBStoreDTO) o;
+
+        if (dfs_directory != null ? !dfs_directory.equals(that.dfs_directory) : that.dfs_directory != null)
+            return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + (dfs_directory != null ? dfs_directory.hashCode() : 0);
+        return result;
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java Tue Nov 22 14:40:18 2011
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.leveldb.dto;
+
+import org.apache.activemq.apollo.dto.StoreDTO;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.File;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="leveldb_store")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class LevelDBStoreDTO extends StoreDTO {
+
+    @XmlAttribute
+    public File directory;
+
+    @XmlAttribute(name="gc_interval")
+    public Integer gc_interval;
+
+    @XmlAttribute(name="read_threads")
+    public Integer read_threads;
+
+    @XmlAttribute
+    public Boolean sync;
+
+    @XmlAttribute(name="paranoid_checks")
+    public Boolean paranoid_checks;
+
+    @XmlAttribute(name="verify_checksums")
+    public Boolean verify_checksums;
+
+    @XmlAttribute(name="log_size")
+    public Integer log_size;
+
+    @XmlAttribute(name="log__write_buffer_size")
+    public Integer log_write_buffer_size;
+
+    @XmlAttribute(name="index_max_open_files")
+    public Integer index_max_open_files;
+
+    @XmlAttribute(name="index_block_restart_interval")
+    public Integer index_block_restart_interval;
+
+    @XmlAttribute(name="index_write_buffer_size")
+    public Integer index_write_buffer_size;
+
+    @XmlAttribute(name="index_block_size")
+    public Integer index_block_size;
+
+    @XmlAttribute(name="index_cache_size")
+    public Long index_cache_size;
+
+    @XmlAttribute(name="index_compression")
+    public String index_compression;
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof LevelDBStoreDTO)) return false;
+        if (!super.equals(o)) return false;
+
+        LevelDBStoreDTO that = (LevelDBStoreDTO) o;
+
+        if (directory != null ? !directory.equals(that.directory) : that.directory != null)
+            return false;
+        if (gc_interval != null ? !gc_interval.equals(that.gc_interval) : that.gc_interval != null)
+            return false;
+        if (index_block_restart_interval != null ? !index_block_restart_interval.equals(that.index_block_restart_interval) : that.index_block_restart_interval != null)
+            return false;
+        if (index_block_size != null ? !index_block_size.equals(that.index_block_size) : that.index_block_size != null)
+            return false;
+        if (index_cache_size != null ? !index_cache_size.equals(that.index_cache_size) : that.index_cache_size != null)
+            return false;
+        if (index_compression != null ? !index_compression.equals(that.index_compression) : that.index_compression != null)
+            return false;
+        if (index_max_open_files != null ? !index_max_open_files.equals(that.index_max_open_files) : that.index_max_open_files != null)
+            return false;
+        if (index_write_buffer_size != null ? !index_write_buffer_size.equals(that.index_write_buffer_size) : that.index_write_buffer_size != null)
+            return false;
+        if (log_size != null ? !log_size.equals(that.log_size) : that.log_size != null)
+            return false;
+        if (log_write_buffer_size != null ? !log_write_buffer_size.equals(that.log_write_buffer_size) : that.log_write_buffer_size != null)
+            return false;
+        if (paranoid_checks != null ? !paranoid_checks.equals(that.paranoid_checks) : that.paranoid_checks != null)
+            return false;
+        if (read_threads != null ? !read_threads.equals(that.read_threads) : that.read_threads != null)
+            return false;
+        if (sync != null ? !sync.equals(that.sync) : that.sync != null)
+            return false;
+        if (verify_checksums != null ? !verify_checksums.equals(that.verify_checksums) : that.verify_checksums != null)
+            return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + (directory != null ? directory.hashCode() : 0);
+        result = 31 * result + (gc_interval != null ? gc_interval.hashCode() : 0);
+        result = 31 * result + (read_threads != null ? read_threads.hashCode() : 0);
+        result = 31 * result + (sync != null ? sync.hashCode() : 0);
+        result = 31 * result + (paranoid_checks != null ? paranoid_checks.hashCode() : 0);
+        result = 31 * result + (verify_checksums != null ? verify_checksums.hashCode() : 0);
+        result = 31 * result + (log_size != null ? log_size.hashCode() : 0);
+        result = 31 * result + (log_write_buffer_size != null ? log_write_buffer_size.hashCode() : 0);
+        result = 31 * result + (index_max_open_files != null ? index_max_open_files.hashCode() : 0);
+        result = 31 * result + (index_block_restart_interval != null ? index_block_restart_interval.hashCode() : 0);
+        result = 31 * result + (index_write_buffer_size != null ? index_write_buffer_size.hashCode() : 0);
+        result = 31 * result + (index_block_size != null ? index_block_size.hashCode() : 0);
+        result = 31 * result + (index_cache_size != null ? index_cache_size.hashCode() : 0);
+        result = 31 * result + (index_compression != null ? index_compression.hashCode() : 0);
+        return result;
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.java?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.java Tue Nov 22 14:40:18 2011
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.leveldb.dto;
+
+import org.apache.activemq.apollo.dto.IntMetricDTO;
+import org.apache.activemq.apollo.dto.StoreStatusDTO;
+import org.apache.activemq.apollo.dto.TimeMetricDTO;
+import org.apache.activemq.apollo.dto.WebAdminDTO;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="leveldb_store_status")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class LevelDBStoreStatusDTO extends StoreStatusDTO {
+
+    @XmlElement(name="journal_append_latency")
+    public TimeMetricDTO journal_append_latency;
+
+    @XmlElement(name="index_update_latency")
+    public TimeMetricDTO index_update_latency;
+
+    @XmlElement(name="message_load_batch_size")
+    public IntMetricDTO message_load_batch_size;
+
+    @XmlElement(name="leveldb_stats")
+    public String index_stats;
+
+    @XmlElement(name="last_checkpoint_pos")
+    public long index_snapshot_pos;
+
+    @XmlElement(name="last_gc_ts")
+    public long last_gc_ts;
+
+    @XmlElement(name="in_gc")
+    public boolean in_gc;
+
+    @XmlElement(name="last_gc_duration")
+    public long last_gc_duration;
+
+    @XmlElement(name="last_append_pos")
+    public long log_append_pos;
+
+    @XmlElement(name="log_stats")
+    public String log_stats;
+
+}

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/package-info.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/package-info.java?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/package-info.java (added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/package-info.java Tue Nov 22 14:40:18 2011
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The JAXB POJOs for the
+ * <a href="http://activemq.apache.org/schema/activemq/apollo/xml-configuration.html">XML Configuration</a>
+ * of the ActiveMQ Broker.
+ */
+@javax.xml.bind.annotation.XmlSchema(
+        namespace = "http://fabric.fusesource.org/apollo",
+        elementFormDefault = javax.xml.bind.annotation.XmlNsForm.QUALIFIED)
+package org.apache.activemq.apollo.broker.store.leveldb.dto;
+

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/ExtensionJaxbModule.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/ExtensionJaxbModule.scala?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/ExtensionJaxbModule.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/ExtensionJaxbModule.scala Tue Nov 22 14:40:18 2011
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.leveldb
+
+import org.apache.activemq.apollo.util.JaxbModule
+
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class ExtensionJaxbModule extends JaxbModule {
+  def xml_package = "org.apache.activemq.apollo.broker.store.leveldb.dto"
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala Tue Nov 22 14:40:18 2011
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.leveldb
+
+import org.fusesource.hawtbuf._
+import java.util.concurrent.TimeUnit
+import org.iq80.leveldb._
+import org.fusesource.leveldbjni.JniDBFactory._
+import java.io.{DataOutput, DataOutputStream}
+
+object HelperTrait {
+
+  def encode(a1:Long):Array[Byte] = {
+    val out = new DataByteArrayOutputStream(
+      AbstractVarIntSupport.computeVarLongSize(a1)
+    )
+    out.writeVarLong(a1)
+    out.getData
+  }
+
+  def decode_long(bytes:Array[Byte]):Long = {
+    val in = new DataByteArrayInputStream(bytes)
+    in.readVarLong()
+  }
+
+  def encode(a1:Byte, a2:Long):Array[Byte] = {
+    val out = new DataByteArrayOutputStream(9)
+    out.writeByte(a1.toInt)
+    out.writeLong(a2)
+    out.getData
+  }
+
+  def encode(a1:Byte, a2:Buffer):Array[Byte] = {
+    val out = new DataByteArrayOutputStream(1+a2.length)
+    out.writeByte(a1.toInt)
+    a2.writeTo(out.asInstanceOf[DataOutput])
+    out.getData
+  }
+
+  def decode_long_key(bytes:Array[Byte]):(Byte, Long) = {
+    val in = new DataByteArrayInputStream(bytes)
+    (in.readByte(), in.readLong())
+  }
+
+  def encode(a1:Byte, a2:Long, a3:Long):Array[Byte] = {
+    val out = new DataByteArrayOutputStream(17)
+    out.writeByte(a1)
+    out.writeLong(a2)
+    out.writeLong(a3)
+    out.getData
+  }
+
+  def decode_long_long_key(bytes:Array[Byte]):(Byte,Long,Long) = {
+    val in = new DataByteArrayInputStream(bytes)
+    (in.readByte(), in.readLong(), in.readLong())
+  }
+
+  def encode(a1:Byte, a2:Int):Array[Byte] = {
+    val out = new DataByteArrayOutputStream(5)
+    out.writeByte(a1)
+    out.writeInt(a2)
+    out.getData
+  }
+
+  def decode_int_key(bytes:Array[Byte]):(Byte,Int) = {
+    val in = new DataByteArrayInputStream(bytes)
+    (in.readByte(), in.readInt())
+  }
+
+  final class RichDB(val db: DB) {
+
+    def getProperty(name:String) = db.getProperty(name)
+
+    def getApproximateSizes(ranges:Range*) = db.getApproximateSizes(ranges:_*)
+
+    def get(key:Array[Byte], ro:ReadOptions=new ReadOptions):Option[Array[Byte]] = {
+      Option(db.get(key, ro))
+    }
+
+    def close:Unit = db.close()
+
+    def delete(key:Array[Byte], wo:WriteOptions=new WriteOptions):Unit = {
+      db.delete(key, wo)
+    }
+
+    def put(key:Array[Byte], value:Array[Byte], wo:WriteOptions=new WriteOptions):Unit = {
+      db.put(key, value, wo)
+    }
+
+    def write[T](wo:WriteOptions=new WriteOptions)(func: WriteBatch=>T):T = {
+      val updates = db.createWriteBatch()
+      try {
+
+        val rc=Some(func(updates))
+        db.write(updates, wo)
+        return rc.get
+      } finally {
+        updates.close();
+      }
+    }
+
+    def snapshot[T](func: Snapshot=>T):T = {
+      val snapshot = db.getSnapshot
+      try {
+        func(snapshot)
+      } finally {
+        snapshot.close()
+      }
+    }
+
+    def cursor_keys(ro:ReadOptions=new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
+      val iterator = db.iterator(ro)
+      iterator.seekToFirst();
+      try {
+        while( iterator.hasNext && func(iterator.peekNext.getKey) ) {
+          iterator.next()
+        }
+      } finally {
+        iterator.close();
+      }
+    }
+
+    def cursor_keys_prefixed(prefix:Array[Byte], ro:ReadOptions=new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
+      val iterator = db.iterator(ro)
+      iterator.seek(prefix);
+      try {
+        def check(key:Array[Byte]) = {
+          key.startsWith(prefix) && func(key)
+        }
+        while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+          iterator.next()
+        }
+      } finally {
+        iterator.close();
+      }
+    }
+
+    def cursor_prefixed(prefix:Array[Byte], ro:ReadOptions=new ReadOptions)(func: (Array[Byte],Array[Byte]) => Boolean): Unit = {
+      val iterator = db.iterator(ro)
+      iterator.seek(prefix);
+      try {
+        def check(key:Array[Byte]) = {
+          key.startsWith(prefix) && func(key, iterator.peekNext.getValue)
+        }
+        while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+          iterator.next()
+        }
+      } finally {
+        iterator.close();
+      }
+    }
+
+    def compare(a1:Array[Byte], a2:Array[Byte]):Int = {
+      new Buffer(a1).compareTo(new Buffer(a2))
+    }
+
+    def cursor_range_keys(start_included:Array[Byte], end_excluded:Array[Byte], ro:ReadOptions=new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
+      val iterator = db.iterator(ro)
+      iterator.seek(start_included);
+      try {
+        def check(key:Array[Byte]) = {
+          (compare(key,end_excluded) < 0) && func(key)
+        }
+        while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+          iterator.next()
+        }
+      } finally {
+        iterator.close();
+      }
+    }
+
+    def cursor_range(start_included:Array[Byte], end_excluded:Array[Byte], ro:ReadOptions=new ReadOptions)(func: (Array[Byte],Array[Byte]) => Boolean): Unit = {
+      val iterator = db.iterator(ro)
+      iterator.seek(start_included);
+      try {
+        def check(key:Array[Byte]) = {
+          (compare(key,end_excluded) < 0) && func(key, iterator.peekNext.getValue)
+        }
+        while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+          iterator.next()
+        }
+      } finally {
+        iterator.close();
+      }
+    }
+
+    def last_key(prefix:Array[Byte], ro:ReadOptions=new ReadOptions): Option[Array[Byte]] = {
+      val copy = new Buffer(prefix).deepCopy().data
+      if ( copy.length > 0 ) {
+        val pos = copy.length-1
+        copy(pos) = (copy(pos)+1).toByte
+      }
+      val iterator = db.iterator(ro)
+      try {
+        iterator.seek(copy);
+        if ( iterator.hasPrev ) {
+          iterator.prev()
+        } else {
+          iterator.seekToLast()
+        }
+        
+        if ( iterator.hasNext ) {
+          val key = iterator.peekNext.getKey
+          if(key.startsWith(prefix)) {
+            Some(key)
+          } else {
+            None
+          } 
+        } else {
+          None
+        }
+      } finally {
+        iterator.close();
+      }
+    }
+  }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/Interval.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/Interval.scala?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/Interval.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/Interval.scala Tue Nov 22 14:40:18 2011
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.leveldb
+
+import java.util.ArrayList
+import java.util.Iterator
+import java.util.List
+import java.util.NoSuchElementException
+import org.apache.activemq.apollo.util.TreeMap
+
+object Interval {
+  def apply[N](start:N)(implicit numeric: scala.math.Numeric[N]):Interval[N] = {
+    import numeric._
+    Interval(start, start+one)
+  }
+}
+
+case class Interval[N](start: N, limit: N)(implicit numeric: scala.math.Numeric[N]) {
+  import numeric._
+
+  def size = limit - start
+  def end = limit - one
+
+  def start(value: N):Interval[N] = Interval(value, limit)
+  def limit(value: N):Interval[N] = Interval(start, value)
+  
+  override def toString = {
+    if (start == end) {
+      start.toString
+    } else {
+      start.toString + "-" + end
+    }
+  }
+
+  def contains(value: N): Boolean = {
+    return start <= value && value < limit
+  }
+}
+
+/**
+ * Tracks numeric ranges.  Handy for keeping track of things like allocation or free lists.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+case class IntervalSet[N](implicit numeric: scala.math.Numeric[N]) extends java.lang.Iterable[Interval[N]] {
+  import numeric._
+  import collection.JavaConversions._
+  private final val ranges = new TreeMap[N, Interval[N]]
+
+  def copy = {
+    val rc = new IntervalSet[N]
+    for (r <- iterator) {
+      rc.ranges.put(r.start, Interval(r.start, r.limit))
+    }
+    rc
+  }
+  def add(r:N):Unit = add(Interval(r))
+  def add(r:Interval[N]): Unit = {
+    var start = r.start
+    var limit = r.limit
+    
+    var entry = ranges.floorEntry(limit)
+    while (entry != null) {
+      var curr = entry
+      var range = curr.getValue
+      entry = entry.previous
+      if (range.limit < start) {
+        entry = null
+      } else {
+        if (limit < range.limit) {
+          limit = range.limit
+        }
+        if (start < range.start) {
+          ranges.removeEntry(curr)
+        } else {
+          curr.setValue(range.limit(limit))
+          return
+        }
+      }
+    }
+    ranges.put(start, Interval(start, limit))
+  }
+
+  def remove(r:N):Unit = remove(Interval(r))
+  def remove(r:Interval[N]): Unit = {
+    val start = r.start 
+    var limit = r.limit
+    var entry = ranges.lowerEntry(limit)
+    while (entry != null) {
+      
+      var curr = entry
+      var range = curr.getValue
+      entry = entry.previous
+      
+      if (range.limit <= start) {
+        entry = null
+      } else {
+        if (limit < range.limit) {
+          ranges.put(limit, Interval(limit, range.limit))
+        }
+        if (start <= range.start) {
+          ranges.removeEntry(curr)
+        } else {
+          curr.setValue(range.limit(start))
+          entry = null
+        }
+      }
+    }
+  }
+
+  def contains(value: N) = {
+    var entry = ranges.floorEntry(value)
+    if (entry == null) {
+      false
+    } else {
+      entry.getValue.contains(value)
+    }
+  }
+
+  def clear: Unit = ranges.clear
+
+  def copy(source: IntervalSet[N]): Unit = {
+    ranges.clear
+    for (entry <- source.ranges.entrySet) {
+      ranges.put(entry.getKey, entry.getValue)
+    }
+  }
+
+  def size = {
+    var rc = 0
+    var entry = ranges.firstEntry
+    while (entry != null) {
+      rc += entry.getValue.size.toInt()
+      entry = entry.next
+    }
+    rc
+  }
+
+  def toArrayList = {
+    new ArrayList(ranges.values)
+  }
+
+  override def toString = {
+    "[ " + ranges.values().mkString(", ")+" ]"
+  }
+
+  def iterator: Iterator[Interval[N]] = {
+    return ranges.values.iterator
+  }
+
+  def values: List[N] = {
+    var rc = new ArrayList[N]
+    for (i <- new ValueIterator(iterator)) {
+      rc.add(i)
+    }
+    return rc
+  }
+
+  def valueIterator: Iterator[N] = new ValueIterator(iterator)
+
+  def valuesIteratorNotInInterval(r: Interval[N]): Iterator[N] = new ValueIterator(iteratorNotInInterval(r))
+
+  def isEmpty = ranges.isEmpty
+
+  def iteratorNotInInterval(mask: Interval[N]): java.util.Iterator[Interval[N]] = {
+    return new Iterator[Interval[N]] {
+      private var iter = ranges.values.iterator
+      private var last = new Interval(mask.start, mask.start)
+      private var _next: Interval[N] = null
+
+      def hasNext: Boolean = {
+        while (next==null && last.limit < mask.limit && iter.hasNext) {
+          var r = iter.next
+          if (r.limit >= last.limit) {
+            if (r.start < last.limit) {
+              last = new Interval(last.start, r.limit)
+            } else {
+              if (r.start < mask.limit) {
+                _next = new Interval(last.limit, r.start)
+              } else {
+                _next = new Interval(last.limit, mask.limit)
+              }
+            }
+          }
+        }
+        return next != null
+      }
+
+      def next: Interval[N] = {
+        if (!hasNext) {
+          throw new NoSuchElementException
+        }
+        last = next
+        _next = null
+        return last
+      }
+
+      def remove: Unit = {
+        throw new UnsupportedOperationException
+      }
+    }
+  }
+
+  private final class ValueIterator(val ranges:Iterator[Interval[N]]) extends java.util.Iterator[N] {
+
+    private var range: Interval[N] = null
+    private var _next: Option[N] = None
+    private var last: N = zero
+
+    def hasNext: Boolean = {
+      if (_next == None) {
+        if (Interval == null) {
+          if (ranges.hasNext) {
+            range = ranges.next
+            _next = Some(range.start)
+          } else {
+            return false
+          }
+        } else {
+          _next = Some(last + one)
+        }
+        if (_next.get == (range.limit - one)) {
+          range = null
+        }
+      }
+      return _next.isDefined
+    }
+
+    def next: N = {
+      if (!hasNext) {
+        throw new NoSuchElementException
+      }
+      last = _next.get
+      _next = None
+      return last
+    }
+
+    def remove: Unit = throw new UnsupportedOperationException
+
+  }
+
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1205020&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Tue Nov 22 14:40:18 2011
@@ -0,0 +1,939 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.leveldb
+
+import dto.LevelDBStoreDTO
+import java.{lang=>jl}
+import java.{util=>ju}
+
+import org.fusesource.hawtbuf.proto.PBMessageFactory
+import org.apache.activemq.apollo.broker.store.PBSupport._
+
+import org.apache.activemq.apollo.broker.store._
+import java.io._
+import java.util.concurrent.TimeUnit
+import org.apache.activemq.apollo.util._
+import collection.mutable.{HashMap, ListBuffer}
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import org.fusesource.hawtdispatch._
+import org.apache.activemq.apollo.util.{TreeMap=>ApolloTreeMap}
+import collection.immutable.TreeMap
+import org.iq80.leveldb._
+import org.fusesource.leveldbjni.JniDBFactory._
+import org.fusesource.leveldbjni.internal.Util
+import org.fusesource.hawtbuf.{Buffer, AbstractVarIntSupport}
+import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
+import org.apache.activemq.apollo.broker.store.MapEntryPB.Bean
+import org.apache.activemq.apollo.broker.store.leveldb.HelperTrait._
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object LevelDBClient extends Log {
+
+  final val message_prefix = 'm'.toByte
+  final val queue_prefix = 'q'.toByte
+  final val queue_entry_prefix = 'e'.toByte
+  final val map_prefix = 'p'.toByte
+
+  final val message_prefix_array = Array(message_prefix)
+  final val queue_prefix_array = Array(queue_prefix)
+  final val map_prefix_array = Array(map_prefix)
+  final val queue_entry_prefix_array = Array(queue_entry_prefix)
+  final val dirty_index_key = bytes(":dirty")
+
+  final val LOG_ADD_QUEUE           = 1.toByte
+  final val LOG_REMOVE_QUEUE        = 2.toByte
+  final val LOG_ADD_MESSAGE         = 3.toByte
+  final val LOG_REMOVE_MESSAGE      = 4.toByte
+  final val LOG_ADD_QUEUE_ENTRY     = 5.toByte
+  final val LOG_REMOVE_QUEUE_ENTRY  = 6.toByte
+  final val LOG_MAP_ENTRY           = 7.toByte
+
+  final val LOG_SUFFIX  = ".log"
+  final val INDEX_SUFFIX  = ".index"
+
+  import FileSupport._
+  def create_sequence_file(directory:File, id:Long, suffix:String) = directory / ("%016x%s".format(id, suffix))
+
+  def find_sequence_files(directory:File, suffix:String):TreeMap[Long, File] = {
+    TreeMap((directory.list_files.flatMap { f=>
+      if( f.getName.endsWith(suffix) ) {
+        try {
+          val base = f.getName.stripSuffix(suffix)
+          val position = java.lang.Long.parseLong(base, 16);
+          Some(position -> f)
+        } catch {
+          case e:NumberFormatException => None
+        }
+      } else {
+        None
+      }
+    }): _* )
+  }
+
+  case class UsageCounter() {
+    var count = 0L
+    var size = 0L
+    def increment(value:Int) = {
+      count += 1
+      size += value
+    }
+  }
+
+}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class LevelDBClient(store: LevelDBStore) {
+
+  import HelperTrait._
+  import LevelDBClient._
+  import FileSupport._
+
+  def dispatchQueue = store.dispatch_queue
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Helpers
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  def config = store.config
+  def directory = config.directory
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Public interface used by the LevelDBStore
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  var sync = false;
+  var verify_checksums = false;
+
+  var log:RecordLog = _
+
+  var index:RichDB = _
+  var index_options:Options = _
+
+  var last_index_snapshot_pos:Long = _
+  val snapshot_rw_lock = new ReentrantReadWriteLock(true)
+
+  var last_gc_ts = 0L
+  var last_gc_duration = 0L
+  var in_gc = false
+  var gc_detected_log_usage = Map[Long, UsageCounter]()
+
+  def dirty_index_file = directory / ("dirty"+INDEX_SUFFIX)
+  def temp_index_file = directory / ("temp"+INDEX_SUFFIX)
+  def snapshot_index_file(id:Long) = create_sequence_file(directory,id, INDEX_SUFFIX)
+
+  def create_log: RecordLog = {
+    new RecordLog(directory, LOG_SUFFIX)
+  }
+
+  def log_size = {
+    import OptionSupport._
+    config.log_size.getOrElse(1024 * 1024 * 100)
+  }
+
+  def start() = {
+    import OptionSupport._
+
+    sync = config.sync.getOrElse(true);
+    verify_checksums = config.verify_checksums.getOrElse(false);
+
+    index_options = new Options();
+    index_options.createIfMissing(true);
+
+    config.index_max_open_files.foreach( index_options.maxOpenFiles(_) )
+    config.index_block_restart_interval.foreach( index_options.blockRestartInterval(_) )
+    config.paranoid_checks.foreach( index_options.paranoidChecks(_) )
+    config.index_write_buffer_size.foreach( index_options.writeBufferSize(_) )
+    config.index_block_size.foreach( index_options.blockSize(_) )
+    Option(config.index_compression).foreach(x => index_options.compressionType( x match {
+      case "snappy" => CompressionType.SNAPPY
+      case "none" => CompressionType.NONE
+      case _ => CompressionType.SNAPPY
+    }) )
+
+    index_options.cacheSize(config.index_cache_size.getOrElse(1024*1024*256L))
+    index_options.logger(new Logger() {
+      def log(msg: String) = debug(store.store_kind+": "+msg)
+    })
+
+    log = create_log
+    log.write_buffer_size = config.log_write_buffer_size.getOrElse(1024*1024*4)
+    log.log_size = log_size
+    log.on_log_rotate = ()=> {
+      // lets queue a request to checkpoint when
+      // the logs rotate.. queue it on the GC thread since GC's lock
+      // the index for a long time.
+      store.gc_executor {
+        snapshot_index
+      }
+    }
+
+    retry {
+      log.open
+    }
+
+    // Find out what was the last snapshot.
+    val snapshots = find_sequence_files(directory, INDEX_SUFFIX)
+    var last_snapshot_index = snapshots.lastOption
+    last_index_snapshot_pos = last_snapshot_index.map(_._1).getOrElse(0)
+
+    // Only keep the last snapshot..
+    snapshots.filterNot(_._1 == last_index_snapshot_pos).foreach( _._2.recursive_delete )
+    temp_index_file.recursive_delete
+
+    retry {
+
+      // Delete the dirty indexes
+      dirty_index_file.recursive_delete
+      dirty_index_file.mkdirs()
+
+      last_snapshot_index.foreach { case (id, file) =>
+        // Resume log replay from a snapshot of the index..
+        try {
+          file.list_files.foreach { file =>
+            Util.link(file, dirty_index_file / file.getName)
+          }
+        } catch {
+          case e:Exception =>
+            warn(e, "Could not recover snapshot of the index: "+e)
+            last_snapshot_index  = None
+        }
+      }
+
+      index = new RichDB(factory.open(dirty_index_file, index_options));
+      try {
+        index.put(dirty_index_key, bytes("true"))
+        // Update the index /w what was stored on the logs..
+        var pos = last_index_snapshot_pos;
+
+        // Replay the log from the last update position..
+        try {
+          while (pos < log.appender_limit) {
+            log.read(pos).map {
+              case (kind, data, len) =>
+                kind match {
+                  case LOG_ADD_MESSAGE =>
+                    val record: MessageRecord = data
+                    index.put(encode(message_prefix, record.key), encode(pos))
+                  case LOG_ADD_QUEUE_ENTRY =>
+                    val record: QueueEntryRecord = data
+                    index.put(encode(queue_entry_prefix, record.queue_key, record.entry_seq), data)
+                  case LOG_REMOVE_QUEUE_ENTRY =>
+                    index.delete(data)
+                  case LOG_ADD_QUEUE =>
+                    val record: QueueRecord = data
+                    index.put(encode(queue_prefix, record.key), data)
+                  case LOG_REMOVE_QUEUE =>
+                    val ro = new ReadOptions
+                    ro.fillCache(false)
+                    ro.verifyChecksums(verify_checksums)
+                    val queue_key = decode_long(data)
+                    index.delete(encode(queue_prefix, queue_key))
+                    index.cursor_keys_prefixed(encode(queue_entry_prefix, queue_key), ro) {
+                      key =>
+                        index.delete(key)
+                        true
+                    }
+                  case LOG_MAP_ENTRY =>
+                    val entry = MapEntryPB.FACTORY.parseUnframed(data)
+                    if (entry.getValue == null) {
+                      index.delete(encode(map_prefix, entry.getKey))
+                    } else {
+                      index.put(encode(map_prefix, entry.getKey), entry.getValue.toByteArray)
+                    }
+                  case _ =>
+                  // Skip unknown records like the RecordLog headers.
+                }
+                pos += len
+            }
+          }
+        }
+        catch {
+          case e:Throwable => e.printStackTrace()
+        }
+
+
+      } catch {
+        case e:Throwable =>
+          // replay failed.. good thing we are in a retry block...
+          index.close
+          throw e;
+      }
+    }
+  }
+
+  def stop() = {
+    // this blocks until all io completes..
+    // Suspend also deletes the index.
+    suspend()
+
+    if (log != null) {
+      log.close
+    }
+    copy_dirty_index_to_snapshot
+    log = null
+  }
+
+  def using_index[T](func: =>T):T = {
+    val lock = snapshot_rw_lock.readLock();
+    lock.lock()
+    try {
+      func
+    } finally {
+      lock.unlock()
+    }
+  }
+
+  def retry_using_index[T](func: =>T):T = retry(using_index( func ))
+
+  /**
+   * TODO: expose this via management APIs, handy if you want to
+   * do a file system level snapshot and want the data to be consistent.
+   */
+  def suspend() = {
+    // Make sure we are the only ones accessing the index. since
+    // we will be closing it to create a consistent snapshot.
+    snapshot_rw_lock.writeLock().lock()
+
+    // Close the index so that it's files are not changed async on us.
+    index.put(dirty_index_key, bytes("false"), new WriteOptions().sync(true))
+    index.close
+  }
+
+  /**
+   * TODO: expose this via management APIs, handy if you want to
+   * do a file system level snapshot and want the data to be consistent.
+   */
+  def resume() = {
+    // re=open it..
+    retry {
+      index = new RichDB(factory.open(dirty_index_file, index_options));
+      index.put(dirty_index_key, bytes("true"))
+    }
+    snapshot_rw_lock.writeLock().unlock()
+  }
+
+  def copy_dirty_index_to_snapshot {
+    if( log.appender_limit == last_index_snapshot_pos  ) {
+      // no need to snapshot again...
+      return
+    }
+
+    // Where we start copying files into.  Delete this on
+    // restart.
+    val tmp_dir = temp_index_file
+    tmp_dir.mkdirs()
+
+    try {
+
+      // Hard link all the index files.
+      dirty_index_file.list_files.foreach {
+        file =>
+          Util.link(file, tmp_dir / file.getName)
+      }
+
+      // Rename to signal that the snapshot is complete.
+      val new_snapshot_index_pos = log.appender_limit
+      tmp_dir.renameTo(snapshot_index_file(new_snapshot_index_pos))
+      snapshot_index_file(last_index_snapshot_pos).recursive_delete
+      last_index_snapshot_pos = new_snapshot_index_pos
+
+    } catch {
+      case e: Exception =>
+        // if we could not snapshot for any reason, delete it as we don't
+        // want a partial check point..
+        warn(e, "Could not snapshot the index: " + e)
+        tmp_dir.recursive_delete
+    }
+  }
+
+  def snapshot_index:Unit = {
+    if( log.appender_limit == last_index_snapshot_pos  ) {
+      // no need to snapshot again...
+      return
+    }
+    suspend()
+    try {
+      copy_dirty_index_to_snapshot
+    } finally {
+      resume()
+    }
+  }
+
+  def retry[T](func: => T): T = {
+    var error:Throwable = null
+    var rc:Option[T] = None
+
+    // We will loop until the tx succeeds.  Perhaps it's
+    // failing due to a temporary condition like low disk space.
+    while(!rc.isDefined) {
+
+      try {
+        rc = Some(func)
+      } catch {
+        case e:Throwable =>
+          if( error==null ) {
+            warn(e, "DB operation failed. (entering recovery mode)")
+          }
+          error = e
+      }
+
+      if (!rc.isDefined) {
+        // We may need to give up if the store is being stopped.
+        if ( !store.service_state.is_started ) {
+          throw error
+        }
+        Thread.sleep(1000)
+      }
+    }
+
+    if( error!=null ) {
+      info("DB recovered from failure.")
+    }
+    rc.get
+  }
+
+  def purge() = {
+    suspend()
+    try{
+      log.close
+      directory.list_files.foreach(_.recursive_delete)
+    } finally {
+      retry {
+        log.open
+      }
+      resume()
+    }
+  }
+
+  def addQueue(record: QueueRecord, callback:Runnable) = {
+    retry_using_index {
+      log.appender { appender =>
+        appender.append(LOG_ADD_QUEUE, record)
+        index.put(encode(queue_prefix, record.key), record)
+      }
+    }
+    callback.run
+  }
+
+  def removeQueue(queue_key: Long, callback:Runnable) = {
+    retry_using_index {
+      log.appender { appender =>
+        val ro = new ReadOptions
+        ro.fillCache(false)
+        ro.verifyChecksums(verify_checksums)
+        appender.append(LOG_REMOVE_QUEUE, encode(queue_key))
+        index.delete(encode(queue_prefix, queue_key))
+        index.cursor_keys_prefixed(encode(queue_entry_prefix, queue_key), ro) { key=>
+          index.delete(key)
+          true
+        }
+      }
+    }
+    callback.run
+  }
+
+  def store(uows: Seq[LevelDBStore#DelayableUOW], callback:Runnable) {
+    retry_using_index {
+      log.appender { appender =>
+
+        var sync_needed = false
+        index.write() { batch =>
+          uows.foreach { uow =>
+
+            for((key,value) <- uow.map_actions) {
+              val entry = new MapEntryPB.Bean()
+              entry.setKey(key)
+              if( value==null ) {
+                batch.delete(encode(map_prefix, key))
+              } else {
+                entry.setValue(value)
+                batch.put(encode(map_prefix, key), value.toByteArray)
+              }
+              appender.append(LOG_MAP_ENTRY, entry.freeze().toUnframedByteArray)
+            }
+
+            uow.actions.foreach { case (msg, action) =>
+              val message_record = action.message_record
+              var pos = 0L
+              var pos_buffer:Buffer = null
+
+              if (message_record != null) {
+                pos = appender.append(LOG_ADD_MESSAGE, message_record)
+                val pos_encoded = encode(pos)
+                pos_buffer = new Buffer(pos_encoded)
+                if( message_record.locator !=null ) {
+                  message_record.locator.set(pos_encoded);
+                }
+                batch.put(encode(message_prefix, action.message_record.key), pos_encoded)
+              }
+
+              action.dequeues.foreach { entry =>
+                if( pos_buffer==null && entry.message_locator!=null ) {
+                  pos_buffer = entry.message_locator
+                }
+                val key = encode(queue_entry_prefix, entry.queue_key, entry.entry_seq)
+                appender.append(LOG_REMOVE_QUEUE_ENTRY, key)
+                batch.delete(key)
+              }
+
+              action.enqueues.foreach { entry =>
+                entry.message_locator = pos_buffer
+                val encoded:Array[Byte] = entry
+                appender.append(LOG_ADD_QUEUE_ENTRY, encoded)
+                batch.put(encode(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded)
+              }
+            }
+            if( !uow.complete_listeners.isEmpty ) {
+              sync_needed = true
+            }
+          }
+        }
+        if( sync_needed && sync ) {
+          appender.flush
+          appender.sync
+        }
+      }
+    }
+    callback.run
+  }
+
+  val metric_load_from_index_counter = new TimeCounter
+  var metric_load_from_index = metric_load_from_index_counter(false)
+
+  def loadMessages(requests: ListBuffer[(Long, AtomicReference[Array[Byte]], (Option[MessageRecord])=>Unit)]):Unit = {
+
+    val ro = new ReadOptions
+    ro.verifyChecksums(verify_checksums)
+    ro.fillCache(true)
+
+    val missing = retry_using_index {
+      index.snapshot { snapshot =>
+        ro.snapshot(snapshot)
+        requests.flatMap { x =>
+          val (message_key, locator, callback) = x
+          val record = metric_load_from_index_counter.time {
+            var pos = 0L
+            var pos_array:Array[Byte] = null
+            if( locator!=null ) {
+              pos_array = locator.get()
+              if( pos_array!=null ) {
+                pos = decode_long(pos_array)
+              }
+            }
+            if( pos == 0L ) {
+              index.get(encode(message_prefix, message_key), ro) match {
+                case Some(value) =>
+                  pos_array = value
+                  pos = decode_long(pos_array)
+                case None =>
+                  pos = 0L
+              }
+            }
+            if (pos == 0L ) {
+              None
+            } else {
+              log.read(pos).map { case (prefix, data, _)=>
+                val rc:MessageRecord = data
+                rc.locator = new AtomicReference[Array[Byte]](pos_array)
+                rc
+              }
+            }
+          }
+          if( record.isDefined ) {
+            callback(record)
+            None
+          } else {
+            Some(x)
+          }
+        }
+      }
+    }
+
+    if (missing.isEmpty)
+      return
+
+    // There's a small chance that a message was missing, perhaps we started a read tx, before the
+    // write tx completed.  Lets try again..
+    retry_using_index {
+      index.snapshot { snapshot =>
+        ro.snapshot(snapshot)
+        missing.foreach { x =>
+          val (message_key, locator, callback) = x
+          val record = metric_load_from_index_counter.time {
+            index.get(encode(message_prefix, message_key), ro).flatMap{ pos_array=>
+              val pos = decode_long(pos_array)
+              log.read(pos).map { case (prefix, data, _)=>
+                val rc:MessageRecord = data
+                rc.locator = new AtomicReference[Array[Byte]](pos_array)
+                rc
+              }
+            }
+          }
+          callback(record)
+        }
+      }
+    }
+  }
+
+  def listQueues: Seq[Long] = {
+    val rc = ListBuffer[Long]()
+    retry_using_index {
+      val ro = new ReadOptions
+      ro.verifyChecksums(verify_checksums)
+      ro.fillCache(false)
+      index.cursor_keys_prefixed(queue_prefix_array, ro) { key =>
+        rc += decode_long_key(key)._2
+        true // to continue cursoring.
+      }
+    }
+    rc
+  }
+
+  def getQueue(queue_key: Long): Option[QueueRecord] = {
+    retry_using_index {
+      val ro = new ReadOptions
+      ro.fillCache(false)
+      ro.verifyChecksums(verify_checksums)
+      index.get(encode(queue_prefix, queue_key), ro).map( x=> decode_queue_record(x)  )
+    }
+  }
+
+  def listQueueEntryGroups(queue_key: Long, limit: Int) : Seq[QueueEntryRange] = {
+    var rc = ListBuffer[QueueEntryRange]()
+    val ro = new ReadOptions
+    ro.verifyChecksums(verify_checksums)
+    ro.fillCache(false)
+    retry_using_index {
+      index.snapshot { snapshot =>
+        ro.snapshot(snapshot)
+
+        var group:QueueEntryRange = null
+        index.cursor_prefixed( encode(queue_entry_prefix, queue_key), ro) { (key, value) =>
+
+          val (_,_,current_key) = decode_long_long_key(key)
+          if( group == null ) {
+            group = new QueueEntryRange
+            group.first_entry_seq = current_key
+          }
+
+          val entry:QueueEntryRecord = value
+
+          group.last_entry_seq = current_key
+          group.count += 1
+          group.size += entry.size
+
+          if(group.expiration == 0){
+            group.expiration = entry.expiration
+          } else {
+            if( entry.expiration != 0 ) {
+              group.expiration = entry.expiration.min(group.expiration)
+            }
+          }
+
+          if( group.count == limit) {
+            rc += group
+            group = null
+          }
+
+          true // to continue cursoring.
+        }
+        if( group!=null ) {
+          rc += group
+        }
+      }
+    }
+    rc
+  }
+
+  def getQueueEntries(queue_key: Long, firstSeq:Long, lastSeq:Long): Seq[QueueEntryRecord] = {
+    var rc = ListBuffer[QueueEntryRecord]()
+    val ro = new ReadOptions
+    ro.verifyChecksums(verify_checksums)
+    ro.fillCache(true)
+    retry_using_index {
+      index.snapshot { snapshot =>
+        ro.snapshot(snapshot)
+        val start = encode(queue_entry_prefix, queue_key, firstSeq)
+        val end = encode(queue_entry_prefix, queue_key, lastSeq+1)
+        index.cursor_range( start, end, ro ) { (key, value) =>
+          rc += value
+          true
+        }
+      }
+    }
+    rc
+  }
+
+  def getLastMessageKey:Long = {
+    retry_using_index {
+      index.last_key(message_prefix_array).map(decode_long_key(_)._2).getOrElse(0)
+    }
+  }
+
+  def get(key: Buffer):Option[Buffer] = {
+    retry_using_index {
+      index.get(encode(map_prefix, key)).map(new Buffer(_))
+    }
+  }
+
+  def getLastQueueKey:Long = {
+    retry_using_index {
+      index.last_key(queue_prefix_array).map(decode_long_key(_)._2).getOrElse(0)
+    }
+  }
+
+  def gc:Unit = {
+    var active_counter = 0
+    var delete_counter = 0
+    val latency_counter = new TimeCounter
+
+    val ro = new ReadOptions()
+    ro.fillCache(false)
+    ro.verifyChecksums(verify_checksums)
+
+    //
+    // This journal_usage will let us get a picture of which queues are using how much of each
+    // log file.  It will help folks figure out why a log file is not getting deleted.
+    //
+    val journal_usage = new ApolloTreeMap[Long,(RecordLog#LogInfo , UsageCounter)]()
+    var append_journal = 0L
+
+    log.log_mutex.synchronized {
+      append_journal = log.log_infos.last._1
+      log.log_infos.foreach(entry=> journal_usage.put(entry._1, (entry._2, UsageCounter())) )
+    }
+
+    def find_journal(pos: Long) = {
+      var entry = journal_usage.floorEntry(pos)
+      if (entry != null) {
+        val (info, usageCounter) = entry.getValue()
+        if (pos < info.limit) {
+          Some(entry.getKey -> usageCounter)
+        } else {
+          None
+        }
+      } else {
+        None
+      }
+    }
+
+    in_gc = true
+    val now = System.currentTimeMillis()
+    debug(store.store_kind+" gc starting")
+    latency_counter.time {
+
+      retry_using_index {
+        index.snapshot { snapshot =>
+          ro.snapshot(snapshot)
+
+          // Figure out which journal files are still in use by which queues.
+          index.cursor_prefixed(queue_entry_prefix_array, ro) { (_,value) =>
+            val entry_record:QueueEntryRecord = value
+            val pos = if(entry_record.message_locator!=null) {
+              decode_long(entry_record.message_locator.toByteArray)
+            } else {
+              index.get(encode(message_prefix, entry_record.message_key)).map(decode_long(_)).getOrElse(0L)
+            }
+
+            find_journal(pos) match {
+              case Some((key,usageCounter)) =>
+                usageCounter.increment(entry_record.size)
+              case None =>
+            }
+
+            // only continue while the service is still running..
+            store.service_state.is_started
+          }
+
+          if (store.service_state.is_started) {
+
+            gc_detected_log_usage = Map((collection.JavaConversions.asScalaSet(journal_usage.entrySet()).map { x=>
+              x.getKey -> x.getValue._2
+            }).toSeq : _ * )
+
+            // Take empty journals out of the map..
+            val empty_journals = ListBuffer[Long]()
+
+            val i = journal_usage.entrySet().iterator();
+            while( i.hasNext ) {
+              val (info, usageCounter) = i.next().getValue
+              if( usageCounter.count==0 && info.position < append_journal) {
+                empty_journals += info.position
+                i.remove()
+              }
+            }
+
+            index.cursor_prefixed(message_prefix_array) { (key,value) =>
+              val pos = decode_long(value)
+
+              if ( !find_journal(pos).isDefined ) {
+                // Delete it.
+                index.delete(key)
+                delete_counter += 1
+              } else {
+                active_counter += 1
+              }
+              // only continue while the service is still running..
+              store.service_state.is_started
+            }
+
+            if (store.service_state.is_started) {
+              // We don't want to delete any journals that the index has not snapshot'ed or
+              // the the
+              val delete_limit = find_journal(last_index_snapshot_pos).map(_._1).
+                    getOrElse(last_index_snapshot_pos).min(log.appender_start)
+
+              empty_journals.foreach { id =>
+                if ( id < delete_limit ) {
+                  log.delete(id)
+                }
+              }
+            }
+          }
+        }
+
+
+      }
+    }
+    last_gc_ts=now
+    last_gc_duration = latency_counter.total(TimeUnit.MILLISECONDS)
+    in_gc = false
+    debug(store.store_kind+" gc ended")
+  }
+
+
+  def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = {
+    try {
+      retry_using_index {
+        index.snapshot { snapshot=>
+          val ro = new ReadOptions
+          ro.snapshot(snapshot)
+          ro.verifyChecksums(verify_checksums)
+          ro.fillCache(false)
+
+          def write_framed(stream:OutputStream, value:Array[Byte]) = {
+            val helper = new AbstractVarIntSupport {
+              def readByte: Byte = throw new UnsupportedOperationException
+              def writeByte(value: Int) = stream.write(value)
+            }
+            helper.writeVarInt(value.length)
+            stream.write(value);
+            true
+          }
+
+          streams.using_map_stream { stream=>
+            index.cursor_prefixed(map_prefix_array, ro) { (key, value) =>
+              val key_buffer = new Buffer(key)
+              key_buffer.moveHead(1)
+              val record = new MapEntryPB.Bean
+              record.setKey(key_buffer)
+              record.setValue(new Buffer(value))
+              record.freeze().writeFramed(stream)
+              true
+            }
+          }
+
+          streams.using_queue_stream { stream =>
+            index.cursor_prefixed(queue_prefix_array, ro) { (_, value) =>
+              write_framed(stream, value)
+            }
+          }
+
+          streams.using_message_stream { stream=>
+            index.cursor_prefixed(message_prefix_array, ro) { (_, value) =>
+              write_framed(stream, value)
+            }
+          }
+
+          streams.using_queue_entry_stream { stream=>
+            index.cursor_prefixed(queue_entry_prefix_array, ro) { (_, value) =>
+              write_framed(stream, value)
+            }
+          }
+
+        }
+      }
+      Success(Zilch)
+    } catch {
+      case x:Exception=>
+        Failure(x.getMessage)
+    }
+  }
+
+  def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] = {
+    try {
+      purge
+
+      retry_using_index {
+        def foreach[Buffer] (stream:InputStream, fact:PBMessageFactory[_,_])(func: (Buffer)=>Unit):Unit = {
+          var done = false
+          do {
+            try {
+              func(fact.parseFramed(stream).asInstanceOf[Buffer])
+            } catch {
+              case x:EOFException =>
+                done = true
+            }
+          } while( !done )
+        }
+
+        log.appender { appender =>
+          streams.using_map_stream { stream=>
+            foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb =>
+              index.put(encode(map_prefix, pb.getKey), pb.getValue.toByteArray)
+            }
+          }
+
+          streams.using_queue_stream { stream=>
+            foreach[QueuePB.Buffer](stream, QueuePB.FACTORY) { record=>
+              index.put(encode(queue_prefix, record.key), record.toUnframedByteArray)
+            }
+          }
+
+          streams.using_message_stream { stream=>
+            foreach[MessagePB.Buffer](stream, MessagePB.FACTORY) { record=>
+              val pos = appender.append(LOG_ADD_MESSAGE, record.toUnframedByteArray)
+              index.put(encode(message_prefix, record.key), encode(pos))
+            }
+          }
+
+          streams.using_queue_entry_stream { stream=>
+            foreach[QueueEntryPB.Buffer](stream, QueueEntryPB.FACTORY) { record=>
+              index.put(encode(queue_entry_prefix, record.queue_key, record.entry_seq), record.toUnframedByteArray)
+            }
+          }
+        }
+
+      }
+      snapshot_index
+      Success(Zilch)
+
+    } catch {
+      case x:Exception=>
+        Failure(x.getMessage)
+    }
+  }
+}



Mime
View raw message