activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1476719 [1/2] - in /activemq/trunk: ./ activemq-leveldb-store/ activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/ activemq-leveldb-s...
Date Sun, 28 Apr 2013 03:53:59 GMT
Author: chirino
Date: Sun Apr 28 03:53:57 2013
New Revision: 1476719

URL: http://svn.apache.org/r1476719
Log:
Adding an initial spike of a M/S replicated leveldb store. Replication protocol is working nicely, just need automated M/S election bits.
Move tests in a test package.
Rename the HA* classes to DFS*.

Added:
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/FileInfo.java
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Login.java
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/SyncResponse.java
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Transfer.java
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/WalAck.java
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/JsonCodec.scala
    activemq/trunk/activemq-leveldb-store/src/test/java/
    activemq/trunk/activemq-leveldb-store/src/test/java/org/
    activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/
    activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/
    activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/
    activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/
    activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/dfs/
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/dfs/DFSLevelDBClient.scala
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/HALevelDBClient.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/dfs/DFSLevelDBStore.scala
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/HALevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/dfs/IndexManifestDTO.java
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/ActiveMQScenario.scala
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/ActiveMQScenario.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/DFSLevelDBFastEnqueueTest.scala
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/HALevelDBFastEnqueueTest.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/DFSLevelDBStoreTest.scala
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/HALevelDBStoreTest.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/EnqueueRateScenariosTest.scala
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/EnqueueRateScenariosTest.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/JMSClientScenario.scala
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/JMSClientScenario.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/LevelDBFastEnqueueTest.scala
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/LevelDBFastEnqueueTest.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/LevelDBPlistTest.java
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/LevelDBPlistTest.java
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/LevelDBStoreTest.scala
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreTest.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/Scenario.scala
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/Scenario.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/test/TestingHDFSServer.scala
      - copied, changed from r1476433, activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/TestingHDFSServer.scala
Removed:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/HALevelDBClient.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/HALevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/ActiveMQScenario.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/EnqueueRateScenariosTest.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/HALevelDBFastEnqueueTest.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/HALevelDBStoreTest.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/JMSClientScenario.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/LevelDBFastEnqueueTest.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/LevelDBPlistTest.java
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreTest.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/Scenario.scala
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/TestingHDFSServer.scala
Modified:
    activemq/trunk/activemq-leveldb-store/pom.xml
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/Log.scala
    activemq/trunk/pom.xml

Modified: activemq/trunk/activemq-leveldb-store/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/pom.xml?rev=1476719&r1=1476718&r2=1476719&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/pom.xml (original)
+++ activemq/trunk/activemq-leveldb-store/pom.xml Sun Apr 28 03:53:57 2013
@@ -40,11 +40,13 @@
       <version>${scala-version}</version>
       <scope>compile</scope>
     </dependency>
+
     <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-broker</artifactId>
       <scope>provided</scope>
     </dependency>
+
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
@@ -101,6 +103,28 @@
       <version>1.5</version>
     </dependency>
 
+    <!-- For Replication -->
+    <dependency>
+      <groupId>org.fusesource.hawtdispatch</groupId>
+      <artifactId>hawtdispatch-transport</artifactId>
+      <version>${hawtdispatch-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.fabric</groupId>
+      <artifactId>fabric-groups</artifactId>
+      <version>7.2.0.redhat-024</version>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.fabric</groupId>
+      <artifactId>fabric-linkedin-zookeeper</artifactId>
+      <version>7.2.0.redhat-024</version>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.fabric</groupId>
+      <artifactId>fabric-zookeeper</artifactId>
+      <version>7.2.0.redhat-024</version>
+    </dependency>
+
     <!-- For Optional Snappy Compression -->
     <dependency>
       <groupId>org.xerial.snappy</groupId>
@@ -316,11 +340,11 @@
             <arg>-deprecation</arg>
           </args>
           <compilerPlugins>
-            <compilerPlugin>
+            <!-- <compilerPlugin>
               <groupId>org.fusesource.jvmassert</groupId>
               <artifactId>jvmassert</artifactId>
               <version>1.4</version>
-            </compilerPlugin>
+            </compilerPlugin> -->
           </compilerPlugins>
         </configuration>
       </plugin>
@@ -336,7 +360,7 @@
           <failIfNoTests>false</failIfNoTests>
           <excludes>
             <exclude>**/EnqueueRateScenariosTest.*</exclude>
-            <exclude>**/HALevelDB*.*</exclude>
+            <exclude>**/DFSLevelDB*.*</exclude>
           </excludes>
         </configuration>
       </plugin>

Copied: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/FileInfo.java (from r1476433, activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/FileInfo.java?p2=activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/FileInfo.java&p1=activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java&r1=1476433&r2=1476719&rev=1476719&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/FileInfo.java Sun Apr 28 03:53:57 2013
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.leveldb;
+package org.apache.activemq.leveldb.replicated.dto;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -27,17 +27,16 @@ import java.util.Set;
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="index_files")
+@XmlRootElement(name="file_info")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class IndexManifestDTO {
-
-    @XmlAttribute(name = "snapshot_id")
-    public long snapshot_id;
-
-    @XmlAttribute(name = "current_manifest")
-    public String current_manifest;
+public class FileInfo {
 
     @XmlAttribute(name = "file")
-    public Set<String> files = new HashSet<String>();
+    public String file;
+
+    @XmlAttribute(name = "length")
+    public long length;
 
+    @XmlAttribute(name = "crc32")
+    public long crc32;
 }

Copied: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java (from r1476433, activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java?p2=activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java&p1=activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java&r1=1476433&r2=1476719&rev=1476719&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java Sun Apr 28 03:53:57 2013
@@ -14,30 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.activemq.leveldb.replicated.dto;
 
-package org.apache.activemq.leveldb;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
-import java.util.HashSet;
-import java.util.Set;
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="index_files")
+@XmlRootElement(name="log_write")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class IndexManifestDTO {
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class LogWrite {
 
-    @XmlAttribute(name = "snapshot_id")
-    public long snapshot_id;
+    @XmlAttribute(name="file")
+    public long file;
 
-    @XmlAttribute(name = "current_manifest")
-    public String current_manifest;
+    @XmlAttribute(name="offset")
+    public long offset;
 
-    @XmlAttribute(name = "file")
-    public Set<String> files = new HashSet<String>();
+    @XmlAttribute(name="length")
+    public long length;
 
 }

Copied: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Login.java (from r1476433, activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Login.java?p2=activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Login.java&p1=activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java&r1=1476433&r2=1476719&rev=1476719&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Login.java Sun Apr 28 03:53:57 2013
@@ -15,29 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.leveldb;
+package org.apache.activemq.leveldb.replicated.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
-import java.util.HashSet;
-import java.util.Set;
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="index_files")
+@XmlRootElement(name="login")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class IndexManifestDTO {
-
-    @XmlAttribute(name = "snapshot_id")
-    public long snapshot_id;
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Login {
 
-    @XmlAttribute(name = "current_manifest")
-    public String current_manifest;
+    @XmlAttribute(name="slave_id")
+    public String slave_id;
 
-    @XmlAttribute(name = "file")
-    public Set<String> files = new HashSet<String>();
+    @XmlAttribute(name="security_token")
+    public String security_token;
 
 }

Copied: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/SyncResponse.java (from r1476433, activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/SyncResponse.java?p2=activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/SyncResponse.java&p1=activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java&r1=1476433&r2=1476719&rev=1476719&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/SyncResponse.java Sun Apr 28 03:53:57 2013
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.leveldb;
+package org.apache.activemq.leveldb.replicated.dto;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -27,17 +27,22 @@ import java.util.Set;
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="index_files")
+@XmlRootElement(name="sync_response")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class IndexManifestDTO {
+public class SyncResponse {
 
-    @XmlAttribute(name = "snapshot_id")
-    public long snapshot_id;
+    @XmlAttribute(name = "snapshot_position")
+    public long snapshot_position;
 
-    @XmlAttribute(name = "current_manifest")
-    public String current_manifest;
+    @XmlAttribute(name = "wal_append_position")
+    public long wal_append_position;
 
-    @XmlAttribute(name = "file")
-    public Set<String> files = new HashSet<String>();
+    @XmlAttribute(name = "index_files")
+    public Set<FileInfo> index_files = new HashSet<FileInfo>();
 
+    @XmlAttribute(name = "log_files")
+    public Set<FileInfo> log_files = new HashSet<FileInfo>();
+
+    @XmlAttribute(name = "append_log")
+    public String append_log;
 }

Copied: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Transfer.java (from r1476433, activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Transfer.java?p2=activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Transfer.java&p1=activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java&r1=1476433&r2=1476719&rev=1476719&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/Transfer.java Sun Apr 28 03:53:57 2013
@@ -15,29 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.leveldb;
+package org.apache.activemq.leveldb.replicated.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
-import java.util.HashSet;
-import java.util.Set;
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="index_files")
+@XmlRootElement(name="transfer_request")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class IndexManifestDTO {
-
-    @XmlAttribute(name = "snapshot_id")
-    public long snapshot_id;
-
-    @XmlAttribute(name = "current_manifest")
-    public String current_manifest;
-
-    @XmlAttribute(name = "file")
-    public Set<String> files = new HashSet<String>();
-
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Transfer {
+    @XmlAttribute(name="file")
+    public String file;
+    @XmlAttribute(name="offset")
+    public long offset;
+    @XmlAttribute(name="length")
+    public long length;
 }

Copied: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/WalAck.java (from r1476433, activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/WalAck.java?p2=activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/WalAck.java&p1=activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java&r1=1476433&r2=1476719&rev=1476719&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/IndexManifestDTO.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/WalAck.java Sun Apr 28 03:53:57 2013
@@ -15,29 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.leveldb;
+package org.apache.activemq.leveldb.replicated.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
-import java.util.HashSet;
-import java.util.Set;
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="index_files")
+@XmlRootElement(name="transfer_request")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class IndexManifestDTO {
-
-    @XmlAttribute(name = "snapshot_id")
-    public long snapshot_id;
-
-    @XmlAttribute(name = "current_manifest")
-    public String current_manifest;
-
-    @XmlAttribute(name = "file")
-    public Set<String> files = new HashSet<String>();
-
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class WalAck {
+    @XmlAttribute(name="position")
+    public long position;
 }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1476719&r1=1476718&r2=1476719&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Sun Apr 28 03:53:57 2013
@@ -311,7 +311,7 @@ class DelayableUOW(val manager:DBManager
       if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) {
         asyncCapacityUsed = s
         countDownFuture.countDown
-        manager.parent.broker_service.getTaskRunnerFactory.execute(^{
+        manager.parent.blocking_executor.execute(^{
           complete_listeners.foreach(_())
         })
       } else {
@@ -333,7 +333,7 @@ class DelayableUOW(val manager:DBManager
       } else {
         manager.uow_complete_latency.add(System.nanoTime() - disposed_at)
         countDownFuture.countDown
-        manager.parent.broker_service.getTaskRunnerFactory.execute(^{
+        manager.parent.blocking_executor.execute(^{
           complete_listeners.foreach(_())
         })
       }
@@ -361,7 +361,7 @@ class DBManager(val parent:LevelDBStore)
 
   var lastCollectionKey = new AtomicLong(0)
   var lastPListKey = new AtomicLong(0)
-  val client:LevelDBClient = parent.createClient
+  def client = parent.client
 
   def writeExecutor = client.writeExecutor
   def flushDelay = parent.flushDelay

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1476719&r1=1476718&r2=1476719&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Sun Apr 28 03:53:57 2013
@@ -457,6 +457,15 @@ class LevelDBClient(store: LevelDBStore)
   def retry[T](func : =>T):T = RetrySupport.retry(LevelDBClient, store.isStarted, func _)
 
   def start() = {
+    init()
+    replay_init()
+    retry {
+      log.open()
+    }
+    replay_from(lastIndexSnapshotPos, log.appender_limit)
+  }
+
+  def init() ={
 
     // Lets check store compatibility...
     directory.mkdirs()
@@ -530,11 +539,9 @@ class LevelDBClient(store: LevelDBStore)
         snapshotIndex(false)
       }
     }
+  }
 
-    retry {
-      log.open
-    }
-
+  def replay_init() = {
     // Find out what was the last snapshot.
     val snapshots = find_sequence_files(directory, INDEX_SUFFIX)
     var lastSnapshotIndex = snapshots.lastOption
@@ -545,7 +552,6 @@ class LevelDBClient(store: LevelDBStore)
     tempIndexFile.recursiveDelete
 
     retry {
-
       // Setup the plist index.
       plistIndexFile.recursiveDelete
       plistIndexFile.mkdirs()
@@ -567,22 +573,26 @@ class LevelDBClient(store: LevelDBStore)
             lastSnapshotIndex  = None
         }
       }
-
       index = new RichDB(factory.open(dirtyIndexFile, indexOptions));
+      loadCounters
+      index.put(DIRTY_INDEX_KEY, TRUE)
+    }
+  }
+
+  def replay_from(from:Long, limit:Long) = {
+    retry {
       try {
-        loadCounters
-        index.put(DIRTY_INDEX_KEY, TRUE)
         // Update the index /w what was stored on the logs..
-        var pos = lastIndexSnapshotPos;
+        var pos = from;
         var last_reported_at = System.currentTimeMillis();
         var showing_progress = false
         var last_reported_pos = 0L
         try {
-          while (pos < log.appender_limit) {
+          while (pos < limit) {
             val now = System.currentTimeMillis();
             if( now > last_reported_at+1000 ) {
-              val at = pos-lastIndexSnapshotPos
-              val total = log.appender_limit-lastIndexSnapshotPos
+              val at = pos-from
+              val total = limit-from
               val rate = (pos-last_reported_pos)*1000.0 / (now - last_reported_at)
               val eta = (total-at)/rate
               val remaining = if(eta > 60*60) {
@@ -711,7 +721,12 @@ class LevelDBClient(store: LevelDBStore)
         os.writeObject(v)
       }
       os.close()
-      index.put(key, baos.toByteArray)
+      try {
+        index.put(key, baos.toByteArray)
+      }
+      catch {
+        case e => throw e
+      }
     }
     storeMap(LOG_REF_INDEX_KEY, logRefs)
     storeMap(COLLECTION_META_KEY, collectionMeta)
@@ -742,13 +757,19 @@ class LevelDBClient(store: LevelDBStore)
 
       // this blocks until all io completes..
       // Suspend also deletes the index.
-      suspend()
+      if( index!=null ) {
+        suspend()
+        index = null
+      }
 
-      if (log != null) {
+      if (log.isOpen) {
         log.close
+        copyDirtyIndexToSnapshot
+      }
+      if( plist!=null ) {
+        plist.close
+        plist=null
       }
-      copyDirtyIndexToSnapshot
-      plist.close
       log = null
     }
   }
@@ -793,12 +814,15 @@ class LevelDBClient(store: LevelDBStore)
     snapshotRwLock.writeLock().unlock()
   }
 
-  def copyDirtyIndexToSnapshot {
+  def copyDirtyIndexToSnapshot:Unit = {
     if( log.appender_limit == lastIndexSnapshotPos  ) {
       // no need to snapshot again...
       return
     }
+    copyDirtyIndexToSnapshot(log.appender_limit)
+  }
 
+  def copyDirtyIndexToSnapshot(walPosition:Long):Unit = {
     // Where we start copying files into.  Delete this on
     // restart.
     val tmpDir = tempIndexFile
@@ -812,10 +836,8 @@ class LevelDBClient(store: LevelDBStore)
       }
 
       // Rename to signal that the snapshot is complete.
-      val newSnapshotIndexPos = log.appender_limit
-      tmpDir.renameTo(snapshotIndexFile(newSnapshotIndexPos))
-      snapshotIndexFile(lastIndexSnapshotPos).recursiveDelete
-      lastIndexSnapshotPos = newSnapshotIndexPos
+      tmpDir.renameTo(snapshotIndexFile(walPosition))
+      replaceLatestSnapshotDirectory(walPosition)
 
     } catch {
       case e: Exception =>
@@ -826,16 +848,17 @@ class LevelDBClient(store: LevelDBStore)
     }
   }
 
+  def replaceLatestSnapshotDirectory(newSnapshotIndexPos: Long) {
+    snapshotIndexFile(lastIndexSnapshotPos).recursiveDelete
+    lastIndexSnapshotPos = newSnapshotIndexPos
+  }
+
   def snapshotIndex(sync:Boolean=false):Unit = {
     suspend()
     try {
       if( sync ) {
         log.current_appender.force
       }
-      if( log.appender_limit == lastIndexSnapshotPos  ) {
-        // no need to snapshot again...
-        return
-      }
       copyDirtyIndexToSnapshot
     } finally {
       resume()
@@ -849,7 +872,7 @@ class LevelDBClient(store: LevelDBStore)
       locked_purge
     } finally {
       retry {
-        log.open
+        log.open()
       }
       resume()
     }
@@ -1290,8 +1313,9 @@ class LevelDBClient(store: LevelDBStore)
 
     // We don't want to delete any journals that the index has not snapshot'ed or
     // the the
-    val deleteLimit = log.log_info(lastIndexSnapshotPos).map(_.position).
-          getOrElse(lastIndexSnapshotPos).min(log.appender_start)
+
+    var limit = oldest_retained_snapshot
+    val deleteLimit = log.log_info(limit).map(_.position).getOrElse(limit).min(log.appender_start)
 
     emptyJournals.foreach { id =>
       if ( id < deleteLimit ) {
@@ -1300,6 +1324,8 @@ class LevelDBClient(store: LevelDBStore)
     }
   }
 
+  def oldest_retained_snapshot = lastIndexSnapshotPos
+
   def removePlist(collectionKey: Long) = {
     val entryKeyPrefix = encodeLong(collectionKey)
     collectionMeta.remove(collectionKey)

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1476719&r1=1476718&r2=1476719&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Sun Apr 28 03:53:57 2013
@@ -23,13 +23,12 @@ import org.apache.activemq.openwire.Open
 import org.apache.activemq.usage.SystemUsage
 import java.io.File
 import java.io.IOException
-import java.util.concurrent.{ExecutionException, Future}
+import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
 import reflect.BeanProperty
 import org.apache.activemq.store._
 import java.util._
 import collection.mutable.ListBuffer
-import concurrent.CountDownLatch
 import javax.management.ObjectName
 import org.apache.activemq.broker.jmx.{BrokerMBeanSupport, AnnotatedMBean}
 import org.apache.activemq.util._
@@ -37,10 +36,34 @@ import org.apache.activemq.leveldb.util.
 import org.apache.activemq.store.PList.PListIterator
 import java.lang
 import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream, Buffer}
+import scala.Some
+import org.apache.activemq.leveldb.CountDownFuture
+import org.apache.activemq.leveldb.XaAckRecord
+import org.apache.activemq.leveldb.DurableSubscription
+import scala.Some
+import org.apache.activemq.leveldb.CountDownFuture
+import org.apache.activemq.leveldb.XaAckRecord
+import org.apache.activemq.leveldb.DurableSubscription
+import scala.Some
+import org.apache.activemq.leveldb.CountDownFuture
+import org.apache.activemq.leveldb.XaAckRecord
+import org.apache.activemq.leveldb.DurableSubscription
+import scala.Some
+import org.apache.activemq.leveldb.CountDownFuture
+import org.apache.activemq.leveldb.XaAckRecord
+import org.apache.activemq.leveldb.DurableSubscription
 
 object LevelDBStore extends Log {
   val DEFAULT_DIRECTORY = new File("LevelDB");
 
+  lazy val BLOCKING_EXECUTOR = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue[Runnable](), new ThreadFactory() {
+      def newThread(r:Runnable) = {
+          val rc = new Thread(null, r, "ActiveMQ Task");
+          rc.setDaemon(true);
+          rc
+      }
+  })
+
   val DONE = new CountDownFuture();
   DONE.countDown
   
@@ -118,6 +141,7 @@ class LevelDBStore extends LockableServi
 
   final val wireFormat = new OpenWireFormat
   final val db = new DBManager(this)
+  final val client = createClient
 
   @BeanProperty
   var directory = DEFAULT_DIRECTORY
@@ -246,6 +270,14 @@ class LevelDBStore extends LockableServi
 
   def broker_service = brokerService
 
+  def blocking_executor:Executor = {
+    if( broker_service != null ) {
+      broker_service.getTaskRunnerFactory
+    } else {
+      BLOCKING_EXECUTOR
+    }
+  }
+
   def setBrokerName(brokerName: String): Unit = {
   }
 

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala?rev=1476719&r1=1476718&r2=1476719&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala Sun Apr 28 03:53:57 2013
@@ -104,7 +104,7 @@ case class RecordLog(directory: File, lo
     (checksum.getValue & 0xFFFFFFFF).toInt
   }
 
-  class LogAppender(file:File, position:Long) extends LogReader(file, position) {
+  class LogAppender(file:File, position:Long, var append_offset:Long=0L) extends LogReader(file, position) {
 
     val info = new LogInfo(file, position, 0)
 
@@ -115,7 +115,7 @@ case class RecordLog(directory: File, lo
       super.dispose()
     }
 
-    var append_offset = 0L
+
     val flushed_offset = new AtomicLong(0)
 
     def append_position = {
@@ -124,11 +124,13 @@ case class RecordLog(directory: File, lo
 
     // set the file size ahead of time so that we don't have to sync the file
     // meta-data on every log sync.
-    channel.position(logSize-1)
-    channel.write(new Buffer(1).toByteBuffer)
-    channel.force(true)
-    if( sync ) {
-      channel.position(0)
+    if( append_offset==0 ) {
+      channel.position(logSize-1)
+      channel.write(new Buffer(1).toByteBuffer)
+      channel.force(true)
+      if( sync ) {
+        channel.position(0)
+      }
     }
 
     val write_buffer = new DataByteArrayOutputStream(BUFFER_SIZE+LOG_HEADER_SIZE)
@@ -143,6 +145,12 @@ case class RecordLog(directory: File, lo
       }
     }
 
+    def skip(length:Long) = this.synchronized {
+      flush
+      append_offset += length
+      flushed_offset.addAndGet(length)
+    }
+
     /**
      * returns the offset position of the data record.
      */
@@ -375,16 +383,16 @@ case class RecordLog(directory: File, lo
     }
   }
 
-  def create_log_appender(position: Long) = {
-    new LogAppender(next_log(position), position)
+  def create_log_appender(position: Long, offset:Long) = {
+    new LogAppender(next_log(position), position, offset)
   }
 
-  def create_appender(position: Long): Any = {
+  def create_appender(position: Long, offset:Long): Any = {
     log_mutex.synchronized {
       if(current_appender!=null) {
         log_infos.put (position, new LogInfo(current_appender.file, current_appender.position, current_appender.append_offset))
       }
-      current_appender = create_log_appender(position)
+      current_appender = create_log_appender(position, offset)
       log_infos.put(position, new LogInfo(current_appender.file, position, 0))
     }
   }
@@ -393,7 +401,7 @@ case class RecordLog(directory: File, lo
   val max_log_flush_latency = TimeMetric()
   val max_log_rotate_latency = TimeMetric()
 
-  def open = {
+  def open(append_size:Long= -1) = {
     log_mutex.synchronized {
       log_infos.clear()
       LevelDBClient.find_sequence_files(directory, logSuffix).foreach { case (position,file) =>
@@ -401,31 +409,41 @@ case class RecordLog(directory: File, lo
       }
 
       val appendPos = if( log_infos.isEmpty ) {
-        0L
+        create_appender(0,0)
       } else {
         val file = log_infos.lastEntry().getValue
-        val r = LogReader(file.file, file.position)
-        try {
-          val actualLength = r.verifyAndGetEndPosition
-          val updated = file.copy(length = actualLength - file.position)
-          log_infos.put(updated.position, updated)
-          if( updated.file.length != file.length ) {
-            // we need to truncate.
-            using(new RandomAccessFile(file.file, "rw")) ( _.setLength(updated.length))
+        if( append_size == -1 ) {
+          val r = LogReader(file.file, file.position)
+          try {
+            val actualLength = r.verifyAndGetEndPosition
+            val updated = file.copy(length = actualLength - file.position)
+            log_infos.put(updated.position, updated)
+            if( updated.file.length != file.length ) {
+              // we need to truncate.
+              using(new RandomAccessFile(file.file, "rw")) ( _.setLength(updated.length))
+            }
+            create_appender(actualLength,0)
+          } finally {
+            r.release()
           }
-          actualLength
-        } finally {
-          r.release()
+        } else {
+          create_appender(file.position,append_size)
         }
       }
+    }
+  }
 
-      create_appender(appendPos)
+  def isOpen = {
+    log_mutex.synchronized {
+      current_appender!=null;
     }
   }
 
   def close = {
     log_mutex.synchronized {
-      current_appender.release
+      if( current_appender!=null ) {
+        current_appender.release
+      }
     }
   }
 
@@ -450,15 +468,20 @@ case class RecordLog(directory: File, lo
       max_log_rotate_latency {
         log_mutex.synchronized {
           if ( current_appender.append_offset >= logSize ) {
-            current_appender.release()
-            on_log_rotate()
-            create_appender(current_appender.append_position)
+            rotate
           }
         }
       }
     }
   }
 
+
+  def rotate[T] = log_mutex.synchronized {
+    current_appender.release()
+    on_log_rotate()
+    create_appender(current_appender.append_position, 0)
+  }
+
   var on_log_rotate: ()=>Unit = ()=>{}
 
   private val reader_cache = new LRUCache[File, LogReader](100) {

Added: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala?rev=1476719&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala Sun Apr 28 03:53:57 2013
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.replicated
+
+
+import org.apache.activemq.leveldb.util._
+
+import FileSupport._
+import java.io._
+import org.apache.activemq.leveldb.{RecordLog, LevelDBClient}
+import java.util
+import org.apache.activemq.leveldb.replicated.dto.{SyncResponse, FileInfo}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object MasterLevelDBClient extends Log {
+
+  val MANIFEST_SUFFIX = ".mf"
+  val LOG_SUFFIX = LevelDBClient.LOG_SUFFIX
+  val INDEX_SUFFIX = LevelDBClient.INDEX_SUFFIX
+
+}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class MasterLevelDBClient(val store:MasterLevelDBStore) extends LevelDBClient(store) {
+  import MasterLevelDBClient._
+  import collection.JavaConversions._
+
+  var snapshots_pending_delete = new util.TreeSet[Long]()
+
+  def slave_held_snapshots = {
+    val rc = new util.HashSet[Long]()
+    for( v <- store.slaves.values() ; s <- v.held_snapshot ) {
+      rc.add(s)
+    }
+    rc
+  }
+
+  override def replaceLatestSnapshotDirectory(newSnapshotIndexPos: Long) {
+    if( slave_held_snapshots.contains(lastIndexSnapshotPos) ) {
+      // is a slave is holding open a snapshot.. lets not delete it's data just yet...
+      snapshots_pending_delete.add(newSnapshotIndexPos)
+      lastIndexSnapshotPos = newSnapshotIndexPos
+    } else {
+      super.replaceLatestSnapshotDirectory(newSnapshotIndexPos)
+    }
+  }
+
+  override def gc(topicPositions: Seq[(Long, Long)]) {
+    val snapshots_to_rm = new util.HashSet(snapshots_pending_delete)
+    snapshots_to_rm.removeAll(slave_held_snapshots);
+
+    for ( snapshot <- snapshots_to_rm ) {
+      snapshotIndexFile(snapshot).recursiveDelete
+    }
+    super.gc(topicPositions)
+  }
+
+  override def oldest_retained_snapshot: Long = {
+    if ( snapshots_pending_delete.isEmpty ) {
+      super.oldest_retained_snapshot
+    } else {
+      snapshots_pending_delete.first()
+    }
+  }
+
+  def snapshot_state(snapshot_id:Long) = {
+    def info(file:File) = {
+      val rc = new FileInfo
+      rc.file = file.getName
+      rc.length = file.length()
+      rc
+    }
+
+    val rc = new SyncResponse
+    rc.snapshot_position = snapshot_id
+    rc.wal_append_position = log.current_appender.append_position
+
+    for( file <- logDirectory.listFiles; if file.getName.endsWith(LOG_SUFFIX) ) {
+      // Only need to sync up to what's been flushed.
+      val fileInfo = info(file)
+      if( log.current_appender.file == file ) {
+        rc.append_log = file.getName
+        fileInfo.length = log.current_appender.flushed_offset.get()
+        fileInfo.crc32 = file.crc32(fileInfo.length)
+      } else {
+        fileInfo.crc32 = file.cached_crc32
+      }
+      rc.log_files.add(fileInfo)
+    }
+
+    val index_dir = LevelDBClient.create_sequence_file(directory, snapshot_id, INDEX_SUFFIX)
+    if( index_dir.exists() ) {
+      for( file <- index_dir.listFiles ) {
+        val name = file.getName
+        if( name !="LOCK" ) {
+          rc.index_files.add(info(file))
+        }
+      }
+    }
+
+    rc
+  }
+
+
+  // Override the log appender implementation so that it
+  // stores the logs on the local and remote file systems.
+  override def createLog = new RecordLog(directory, LOG_SUFFIX) {
+
+    override def create_log_appender(position: Long, offset:Long) = {
+      new LogAppender(next_log(position), position, offset) {
+
+        val file_name = file.getName
+
+        override def flush = this.synchronized {
+          val offset = flushed_offset.get()
+          super.flush
+          val length = flushed_offset.get() - offset;
+          store.replicate_wal(file, position, offset, length)
+        }
+
+        override def force = {
+          flush
+          store.wal_sync_to(position+flushed_offset.get())
+        }
+
+      }
+    }
+  }
+}

Added: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala?rev=1476719&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala Sun Apr 28 03:53:57 2013
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.replicated
+
+import org.apache.activemq.leveldb.LevelDBStore
+import org.apache.activemq.util.ServiceStopper
+import org.apache.activemq.leveldb.util.FileSupport._
+import org.apache.activemq.leveldb.util.{JsonCodec, Log}
+import java.util
+import org.fusesource.hawtdispatch._
+import org.apache.activemq.leveldb.replicated.dto._
+import org.fusesource.hawtdispatch.transport._
+import java.util.concurrent._
+import java.io.{IOException, File}
+import java.net.{InetSocketAddress, URI}
+import java.util.concurrent.atomic.AtomicLong
+import scala.reflect.BeanProperty
+import java.util.UUID
+
+class PositionSync(val position:Long, count:Int) extends CountDownLatch(count)
+
+object MasterLevelDBStore extends Log
+
+/**
+ */
+class MasterLevelDBStore extends LevelDBStore {
+
+  import MasterLevelDBStore._
+  import collection.JavaConversions._
+  import ReplicationSupport._
+
+  @BeanProperty
+  var bind = "tcp://0.0.0.0:61619"
+  @BeanProperty
+  var securityToken = ""
+  @BeanProperty
+  var minReplica = 1
+
+  val slaves = new ConcurrentHashMap[String,SlaveState]()
+
+  def replicaId:String = {
+    val replicaid_file = directory / "replicaid.txt"
+    if( replicaid_file.exists() ) {
+      replicaid_file.readText()
+    } else {
+      val rc = UUID.randomUUID().toString
+      replicaid_file.getParentFile.mkdirs()
+      replicaid_file.writeText(rc)
+      rc
+    }
+  }
+
+  override def doStart = {
+    super.doStart
+    start_protocol_server
+  }
+
+  override def doStop(stopper: ServiceStopper): Unit = {
+    if( transport_server!=null ) {
+      transport_server.start(NOOP)
+      transport_server = null
+    }
+    super.doStop(stopper)
+  }
+
+  override def createClient = new MasterLevelDBClient(this)
+  def master_client = client.asInstanceOf[MasterLevelDBClient]
+
+
+  //////////////////////////////////////
+  // Replication Protocol Stuff
+  //////////////////////////////////////
+  var transport_server:TransportServer = _
+
+  def start_protocol_server = {
+    transport_server = new TcpTransportServer(new URI(bind))
+    transport_server.setBlockingExecutor(blocking_executor)
+    transport_server.setDispatchQueue(createQueue("replication server"))
+    transport_server.setTransportServerListener(new TransportServerListener(){
+      def onAccept(transport: Transport) {
+        transport.setDispatchQueue(createQueue("connection from "+transport.getRemoteAddress))
+        transport.setBlockingExecutor(blocking_executor)
+        new Session(transport)
+      }
+      def onAcceptError(error: Exception) {
+        warn(error)
+      }
+    })
+    val start_latch = new CountDownLatch(1)
+    transport_server.start(^{
+      start_latch.countDown()
+    })
+    start_latch.await()
+  }
+
+  def getPort = transport_server.getSocketAddress.asInstanceOf[InetSocketAddress].getPort
+
+  def stop_protocol_server = {
+    transport_server.stop(NOOP)
+  }
+
+
+  case class HawtCallback[T](cb:(T)=>Unit) extends Function1[T, Unit] {
+    val queue = getCurrentQueue
+    def apply(value:T) = {
+      if( queue==null || queue.isExecuting ) {
+        cb(value)
+      } else {
+        queue {
+          cb(value)
+        }
+      }
+    }
+  }
+
+  class Session(transport: Transport) extends TransportHandler(transport) {
+
+    var login:Login = _
+    var slave_state:SlaveState = _
+    var disconnected = false
+
+    def queue = transport.getDispatchQueue
+
+    override def onTransportFailure(error: IOException) {
+      if( !disconnected ) {
+        warn("Unexpected session error: "+error)
+      }
+      super.onTransportFailure(error)
+    }
+
+    def onTransportCommand(command: Any) = {
+      command match {
+        case command:ReplicationFrame =>
+          command.action match {
+            case LOGIN_ACTION =>
+              handle_login(JsonCodec.decode(command.body, classOf[Login]))
+            case SYNC_ACTION =>
+              handle_sync()
+            case GET_ACTION =>
+              handle_get(JsonCodec.decode(command.body, classOf[Transfer]))
+            case ACK_ACTION =>
+              handle_ack(JsonCodec.decode(command.body, classOf[WalAck]))
+            case DISCONNECT_ACTION =>
+              handle_disconnect()
+            case _ =>
+              sendError("Unknown frame action: "+command.action)
+          }
+      }
+    }
+
+    def handle_login(request:Login):Unit = {
+      if( request.security_token != securityToken ) {
+        sendError("Invalid security_token");
+      } else {
+        login = request;
+        sendOk(null)
+      }
+    }
+
+    override def onTransportDisconnected() {
+      val slave_state = this.slave_state;
+      if( slave_state !=null ) {
+        this.slave_state=null
+        if( slave_state.stop(this) && isStarted ) {
+          slaves.remove(slave_state.slave_id, slave_state)
+        }
+      }
+    }
+
+    def handle_disconnect():Unit = {
+      disconnected = true;
+      sendOk(null)
+    }
+
+    def handle_sync():Unit = {
+      if( login == null ) {
+        sendError("Not logged in")
+        return;
+      }
+      info("handle_sync")
+      slave_state = slaves.get(login.slave_id)
+      if ( slave_state == null ) {
+        slave_state = new SlaveState(login.slave_id)
+        slaves.put(login.slave_id, slave_state)
+      }
+      slave_state.start(Session.this)
+    }
+
+    def handle_ack(req:WalAck):Unit = {
+      if( login == null || slave_state == null) {
+        return;
+      }
+      slave_state.position_update(req.position)
+    }
+
+    def handle_get(req:Transfer):Unit = {
+      if( login == null ) {
+        sendError("Not logged in")
+        return;
+      }
+
+      val file = if( req.file.startsWith("log/" ) ) {
+        client.logDirectory / req.file.stripPrefix("log/")
+      } else {
+        client.directory / req.file
+      }
+
+      if( !file.exists() ) {
+        sendError("file does not exist")
+        return
+      }
+      val length = file.length()
+
+      if( req.offset > length ) {
+        sendError("Invalid offset")
+        return
+      }
+      if( req.offset+req.length > length ) {
+        sendError("Invalid length")
+      }
+      sendOk(null)
+      send(FileTransferFrame(file, req.offset, req.length))
+    }
+
+  }
+
+  class SlaveState(val slave_id:String) {
+
+    var held_snapshot:Option[Long] = None
+    var session:Session = _
+    var position = new AtomicLong(0)
+
+    def start(session:Session) = {
+      info("SlaveState:start")
+
+      val resp = this.synchronized {
+        if( this.session!=null ) {
+          this.session.transport.stop(NOOP)
+        }
+
+        this.session = session
+        val snapshot_id = client.lastIndexSnapshotPos
+        held_snapshot = Option(snapshot_id)
+        position.set(0)
+        master_client.snapshot_state(snapshot_id)
+      }
+      info("Slave has connected: "+slave_id)
+      session.queue {
+        session.sendOk(resp)
+      }
+    }
+
+    def stop(session:Session) = {
+      this.synchronized {
+        if( this.session == session ) {
+          info("Slave has disconnected: "+slave_id)
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    def replicate_wal(frame1:ReplicationFrame, frame2:FileTransferFrame ) = {
+      val h = this.synchronized {
+        session
+      }
+      if( h !=null ) {
+        h.queue {
+          h.send(frame1)
+          h.send(frame2)
+        }
+      }
+    }
+
+    def position_update(position:Long) = {
+      val was = this.position.getAndSet(position)
+      if( was == 0 ) {
+        info("Slave has finished synchronizing: "+slave_id)
+        this.synchronized {
+          this.held_snapshot = None
+        }
+      }
+      check_position_sync
+    }
+
+    @volatile
+    var last_position_sync:PositionSync = null
+    def check_position_sync = {
+      val p = position_sync
+      if( last_position_sync!=p ) {
+        if( position.get >= p.position ) {
+          p.countDown
+          last_position_sync = p
+        }
+      }
+    }
+  }
+
+  @volatile
+  var position_sync = new PositionSync(0L, 0)
+
+  def wal_sync_to(position:Long):Unit = {
+    if( minReplica<1 ) {
+      return
+    }
+    val position_sync = new PositionSync(position, minReplica)
+    this.position_sync = position_sync
+    for( slave <- slaves.values() ) {
+      slave.check_position_sync
+    }
+    while( !position_sync.await(1, TimeUnit.SECONDS) ) {
+      println("Waiting for slaves to ack log position: "+position_sync.position)
+      for( slave <- slaves.values() ) {
+        slave.check_position_sync
+      }
+    }
+  }
+
+  def replicate_wal(file:File, position:Long, offset:Long, length:Long):Unit = {
+    if( length > 0 ) {
+      val value = new LogWrite
+      value.file = position;
+      value.offset = offset;
+      value.length = length
+      val frame1 = ReplicationFrame(WAL_ACTION, JsonCodec.encode(value))
+      val frame2 = FileTransferFrame(file, offset, length)
+      for( slave <- slaves.values() ) {
+        slave.replicate_wal(frame1, frame2)
+      }
+    }
+  }
+
+}

Added: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala?rev=1476719&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala Sun Apr 28 03:53:57 2013
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.replicated
+
+import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
+import org.fusesource.hawtdispatch.transport.AbstractProtocolCodec
+import org.fusesource.hawtdispatch.transport.AbstractProtocolCodec.Action
+import java.nio.{MappedByteBuffer, ByteBuffer}
+import org.fusesource.hawtdispatch.Task
+import java.io.{OutputStream, File}
+import org.fusesource.hawtdispatch.transport.ProtocolCodec.BufferState
+import java.util
+
+case class ReplicationFrame(action:AsciiBuffer, body:Buffer)
+case class FileTransferFrame(file:File, offset:Long, length:Long)
+
+class ReplicationProtocolCodec extends AbstractProtocolCodec {
+  import ReplicationSupport._
+  val transfers  = new util.LinkedList[MappedByteBuffer]();
+
+  def encode(value: Any) {
+    value match {
+      case value:ReplicationFrame =>
+        value.action.writeTo(nextWriteBuffer.asInstanceOf[OutputStream])
+        nextWriteBuffer.write('\n');
+        if( value.body!=null ) {
+          value.body.writeTo(nextWriteBuffer.asInstanceOf[OutputStream])
+        }
+        nextWriteBuffer.write(0);
+      case value:FileTransferFrame =>
+        if( value.length > 0 ) {
+          val buffer = map(value.file, value.offset, value.length, true)
+          writeDirect(buffer);
+          if( buffer.hasRemaining ) {
+            transfers.addLast(buffer)
+          } else {
+            unmap(buffer)
+          }
+        }
+      case value:Buffer =>
+        value.writeTo(nextWriteBuffer.asInstanceOf[OutputStream])
+    }
+  }
+
+
+  override def flush(): BufferState = {
+    val rc = super.flush()
+    while( !transfers.isEmpty && !transfers.peekFirst().hasRemaining) {
+      unmap(transfers.removeFirst())
+    }
+    rc
+  }
+
+  def initialDecodeAction() = readHeader
+
+  val readHeader = new Action() {
+    def apply = {
+      val action_line:Buffer = readUntil('\n'.toByte, 80)
+      if( action_line!=null ) {
+        action_line.moveTail(-1);
+        nextDecodeAction = readReplicationFrame(action_line.ascii())
+        nextDecodeAction.apply()
+      } else {
+        null
+      }
+    }
+  }
+
+  def readReplicationFrame(action:AsciiBuffer):Action = new Action() {
+    def apply = {
+      val data:Buffer = readUntil(0.toByte, 1024*64)
+      if( data!=null ) {
+        data.moveTail(-1);
+        nextDecodeAction = readHeader
+        ReplicationFrame(action, data)
+      } else {
+        null
+      }
+    }
+  }
+
+  def readData(data_target:ByteBuffer, cb:Task) = {
+    nextDecodeAction = new Action() {
+      def apply = {
+        if( readDirect(data_target) ) {
+          nextDecodeAction = readHeader
+          cb.run()
+        }
+        null
+      }
+    }
+  }
+}

Added: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala?rev=1476719&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala Sun Apr 28 03:53:57 2013
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.replicated
+
+import org.fusesource.hawtbuf.Buffer._
+import java.util.concurrent._
+import java.nio.MappedByteBuffer
+import sun.nio.ch.DirectBuffer
+import java.io.{RandomAccessFile, File}
+import java.nio.channels.FileChannel
+import java.util.concurrent.atomic.AtomicInteger
+import org.fusesource.hawtdispatch._
+
+object ReplicationSupport {
+
+  val WAL_ACTION = ascii("wal")
+  val LOGIN_ACTION= ascii("LevelDB Store Replication v1:login")
+  val SYNC_ACTION = ascii("sync")
+  val GET_ACTION = ascii("get")
+  val ACK_ACTION = ascii("ack")
+  val OK_ACTION = ascii("ok")
+  val DISCONNECT_ACTION = ascii("disconnect")
+  val ERROR_ACTION = ascii("error")
+
+  def unmap(buffer:MappedByteBuffer ) {
+    try {
+      buffer.asInstanceOf[DirectBuffer].cleaner().clean();
+    } catch {
+      case ignore:Throwable =>
+    }
+  }
+
+  def map(file:File, offset:Long, length:Long, readOnly:Boolean) = {
+    val raf = new RandomAccessFile(file, if(readOnly) "r" else "rw");
+    try {
+      val mode = if (readOnly) FileChannel.MapMode.READ_ONLY else FileChannel.MapMode.READ_WRITE
+      raf.getChannel().map(mode, offset, length);
+    } finally {
+      raf.close();
+    }
+  }
+
+  case class RetainedLatch() {
+
+    private val latch = new CountDownLatch(1)
+    private val remaining = new AtomicInteger(1)
+    private val release_task = ^{ release }
+
+    def retain = {
+      remaining.incrementAndGet()
+      release_task
+    }
+
+    def release {
+      if (remaining.decrementAndGet() == 0) {
+        latch.countDown()
+      }
+    }
+
+    def await() = latch.await()
+  }
+
+}

Added: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala?rev=1476719&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala Sun Apr 28 03:53:57 2013
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.replicated
+
+import org.apache.activemq.leveldb.{LevelDBClient, LevelDBStore}
+import org.apache.activemq.util.ServiceStopper
+import java.util
+import org.fusesource.hawtdispatch._
+import org.apache.activemq.leveldb.replicated.dto._
+import org.fusesource.hawtdispatch.transport._
+import java.net.URI
+import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
+import org.apache.activemq.leveldb.util._
+
+import FileSupport._
+import java.io.{IOException, RandomAccessFile, File}
+import scala.reflect.BeanProperty
+import java.util.UUID
+
+object SlaveLevelDBStore extends Log
+
+/**
+ */
+class SlaveLevelDBStore extends LevelDBStore {
+
+  import SlaveLevelDBStore._
+  import ReplicationSupport._
+  import collection.JavaConversions._
+
+  @BeanProperty
+  var connect = "tcp://0.0.0.0:61619"
+  @BeanProperty
+  var securityToken = ""
+
+  val queue = createQueue("leveldb replication slave")
+  var replay_from = 0L
+  var caughtUp = false
+
+  override def createClient = new SlaveLevelDBClient(this)
+  def slave_client = client.asInstanceOf[SlaveLevelDBClient]
+
+  class SlaveLevelDBClient(val store:SlaveLevelDBStore) extends LevelDBClient(store) {
+  }
+
+  var wal_session:Session = _
+  var transfer_session:Session = _
+
+  def replicaId:String = {
+    val replicaid_file = directory / "replicaid.txt"
+    if( replicaid_file.exists() ) {
+      replicaid_file.readText()
+    } else {
+      val rc = UUID.randomUUID().toString
+      replicaid_file.getParentFile.mkdirs()
+      replicaid_file.writeText(rc)
+      rc
+    }
+  }
+
+  override def doStart() = {
+    client.init()
+
+    if (purgeOnStatup) {
+      purgeOnStatup = false
+      db.client.locked_purge
+      info("Purged: "+this)
+    }
+
+    val transport = new TcpTransport()
+    transport.setBlockingExecutor(blocking_executor)
+    transport.setDispatchQueue(queue)
+    transport.connecting(new URI(connect), null)
+
+    info("Connecting to master...")
+    wal_session = new Session(transport, (session)=>{
+      info("Connected to master.  Syncing")
+      session.request_then(SYNC_ACTION, null) { body =>
+        val response = JsonCodec.decode(body, classOf[SyncResponse])
+        transfer_missing(response)
+        session.handler = wal_handler(session)
+      }
+    })
+  }
+  var stopped = false
+  override def doStop(stopper: ServiceStopper) = {
+    val latch = RetainedLatch()
+    if( wal_session !=null ) {
+      wal_session.disconnect(latch.retain)
+      wal_session = null
+    }
+    if( transfer_session !=null ) {
+      transfer_session.disconnect(latch.retain)
+      transfer_session = null
+    }
+    queue {
+      stopped = true
+      latch.release
+    }
+    // Make sure the sessions are stopped before we close the client.
+    latch.await()
+    db.client.stop()
+  }
+
+  var wal_append_position = 0L
+  var wal_append_offset = 0L
+
+  def send_wal_ack = {
+    queue.assertExecuting()
+    if( caughtUp && !stopped && wal_session!=null) {
+      val ack = new WalAck()
+      ack.position = wal_append_position
+//      info("Sending ack: "+wal_append_position)
+      wal_session.send(ACK_ACTION, ack)
+      if( replay_from != ack.position ) {
+        val old_replay_from = replay_from
+        replay_from = ack.position
+        client.writeExecutor {
+          client.replay_from(old_replay_from, ack.position)
+        }
+      }
+    }
+  }
+
+  def wal_handler(session:Session): (AnyRef)=>Unit = (command)=>{
+    command match {
+      case command:ReplicationFrame =>
+        command.action match {
+          case WAL_ACTION =>
+            val value = JsonCodec.decode(command.body, classOf[LogWrite])
+            if( caughtUp && value.offset ==0 ) {
+              client.log.rotate
+            }
+            val file = client.log.next_log(value.file)
+            val buffer = map(file, value.offset, value.length, false)
+            session.codec.readData(buffer, ^{
+              unmap(buffer)
+//              info("Slave WAL update: %s, (offset: %d, length: %d), sending ack:%s", file, value.offset, value.length, caughtUp)
+              wal_append_offset = value.offset+value.length
+              wal_append_position = value.file + wal_append_offset
+              if( !stopped ) {
+                if( caughtUp ) {
+                  client.log.current_appender.skip(value.length)
+                }
+                send_wal_ack
+              }
+            })
+          case OK_ACTION =>
+            // This comes in as response to a disconnect we send.
+          case _ => session.fail("Unexpected command action: "+command.action)
+        }
+    }
+  }
+
+  class Session(transport:Transport, on_login: (Session)=>Unit) extends TransportHandler(transport) {
+
+    override def onTransportFailure(error: IOException) {
+      if( isStarted ) {
+        warn("Unexpected session error: "+error)
+      }
+      super.onTransportFailure(error)
+    }
+
+    override def onTransportConnected {
+      super.onTransportConnected
+      val login = new Login
+      login.security_token = securityToken
+      login.slave_id = replicaId
+      request_then(LOGIN_ACTION, login) { body =>
+        on_login(Session.this)
+      }
+    }
+
+    def disconnect(cb:Task) = queue {
+      send(DISCONNECT_ACTION, null)
+      transport.flush()
+      transport.stop(cb)
+    }
+
+    def fail(msg:String) = {
+      error(msg)
+      transport.stop(NOOP)
+    }
+
+    var handler: (AnyRef)=>Unit = response_handler
+    def onTransportCommand(command: AnyRef) = handler(command)
+
+    def request_then(action:AsciiBuffer, body:AnyRef)(cb:(Buffer)=>Unit) = {
+      request(action, body){ response =>
+        response.action match {
+          case OK_ACTION =>
+            cb(response.body)
+          case ERROR_ACTION =>
+            fail(action+" failed: "+response.body.ascii().toString)
+          case _ =>
+            fail("Unexpected response action: "+response.action)
+        }
+      }
+    }
+
+    def request(action:AsciiBuffer, body:AnyRef)(cb:(ReplicationFrame)=>Unit) = {
+      response_callbacks.addLast(cb)
+      send(action, body)
+    }
+    val response_callbacks = new util.LinkedList[(ReplicationFrame)=>Unit]()
+    def response_handler: (AnyRef)=>Unit = (command)=> {
+      command match {
+        case command:ReplicationFrame =>
+          if( response_callbacks.isEmpty ) {
+            error("No response callback registered")
+            transport.stop(NOOP)
+          } else {
+            val callback = response_callbacks.removeFirst()
+            callback(command)
+          }
+      }
+    }
+  }
+
+  def transfer_missing(state:SyncResponse) = {
+    // Start up another connection to catch sync
+    // up the missing data
+    val log_dir = client.logDirectory
+    val dirty_index = client.dirtyIndexFile
+    dirty_index.recursiveDelete
+
+    val snapshot_index = client.snapshotIndexFile(state.snapshot_position)
+
+    val transport = new TcpTransport()
+    transport.setBlockingExecutor(blocking_executor)
+    transport.setDispatchQueue(queue)
+    transport.connecting(new URI(connect), null)
+
+    info("Connecting catchup session...")
+    transfer_session = new Session(transport, (session)=> {
+      info("Catchup session connected...")
+
+      // Transfer the log files..
+      var append_offset = 0L
+      for( x <- state.log_files ) {
+        if( x.file == state.append_log ) {
+          append_offset = x.length
+        }
+
+        val target_file: File = log_dir / x.file
+
+        def previously_downloaded:Boolean = {
+          if( !target_file.exists() )
+            return false
+
+          if (target_file.length() < x.length )
+            return false
+
+          if (target_file.length() == x.length )
+            return target_file.cached_crc32 == x.crc32
+
+          if ( target_file.crc32(x.length) == x.crc32 ) {
+            // we don't want to truncate the log file currently being appended to.
+            if( x.file != state.append_log ) {
+              // Our log file might be longer. lets truncate to match.
+              val raf = new RandomAccessFile(target_file, "rw")
+              try {
+                raf.setLength(x.length)
+              } finally {
+                raf.close();
+              }
+            }
+            return true;
+          }
+          return false
+        }
+
+        // We don't have to transfer log files that have been previously transferred.
+        if( previously_downloaded ) {
+          info("Slave skipping download of: log/"+x.file)
+        } else {
+          val transfer = new Transfer()
+          transfer.file = "log/"+x.file
+          transfer.offset = 0
+          transfer.length = x.length
+          debug("Slave requested: "+transfer.file)
+          session.request_then(GET_ACTION, transfer) { body =>
+            val buffer = map(target_file, 0, x.length, false)
+            session.codec.readData(buffer, ^{
+              unmap(buffer)
+              info("Slave downloaded: "+transfer.file+" ("+x.length+" bytes)")
+            })
+          }
+        }
+      }
+
+      // Transfer the index files..
+      if( !state.index_files.isEmpty ) {
+        dirty_index.mkdirs()
+      }
+      for( x <- state.index_files ) {
+        val transfer = new Transfer()
+        transfer.file = snapshot_index.getName+"/"+x.file
+        transfer.offset = 0
+        transfer.length = x.length
+        info("Slave requested: "+transfer.file)
+        session.request_then(GET_ACTION, transfer) { body =>
+          val buffer = map(dirty_index / x.file, 0, x.length, false)
+          session.codec.readData(buffer, ^{
+            unmap(buffer)
+            info("Slave downloaded: "+transfer.file+" ("+x.length+" bytes)")
+          })
+        }
+      }
+
+      session.request_then(DISCONNECT_ACTION, null) { body =>
+        // Ok we are now caught up.
+        info("Slave has now caught up")
+        transport.stop(NOOP)
+        transfer_session = null
+        replay_from = state.snapshot_position
+        if( wal_append_position < state.wal_append_position ) {
+          wal_append_position = state.wal_append_position
+          wal_append_offset = append_offset
+        }
+        client.writeExecutor {
+          if( !state.index_files.isEmpty ) {
+            client.copyDirtyIndexToSnapshot(state.wal_append_position)
+          }
+          client.replay_init()
+        }
+        caughtUp = true
+        client.log.open(wal_append_offset)
+        send_wal_ack
+      }
+    })
+    state.snapshot_position
+  }
+
+
+}

Added: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala?rev=1476719&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala Sun Apr 28 03:53:57 2013
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.replicated
+
+import org.fusesource.hawtdispatch.transport.{TransportListener, DefaultTransportListener, Transport}
+import java.util
+import org.apache.activemq.leveldb.replicated.ReplicationSupport._
+import org.fusesource.hawtdispatch._
+import org.apache.activemq.leveldb.util.JsonCodec
+import java.io.IOException
+import org.fusesource.hawtbuf.AsciiBuffer
+
+/**
+ */
+abstract class TransportHandler(val transport: Transport) extends TransportListener {
+
+  var outbound = new util.LinkedList[AnyRef]()
+  val codec = new ReplicationProtocolCodec
+
+  transport.setProtocolCodec(codec)
+  transport.setTransportListener(this)
+  transport.start(NOOP)
+
+  def onTransportConnected = transport.resumeRead()
+  def onTransportDisconnected() = {}
+  def onRefill = drain
+  def onTransportFailure(error: IOException) = transport.stop(NOOP)
+
+  def drain:Unit = {
+    while( !outbound.isEmpty ) {
+      val value = outbound.peekFirst()
+      if( transport.offer(value) ) {
+        outbound.removeFirst()
+      } else {
+        return
+      }
+    }
+  }
+
+  def send(value:AnyRef):Unit = {
+    transport.getDispatchQueue.assertExecuting()
+    outbound.add(value)
+    drain
+  }
+
+  def send(action:AsciiBuffer, body:AnyRef):Unit = send(ReplicationFrame(action, if(body==null) null else JsonCodec.encode(body)))
+  def sendError(error:String) = send(ERROR_ACTION, error)
+  def sendOk(body:AnyRef) = send(OK_ACTION, body)
+
+}

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala?rev=1476719&r1=1476718&r2=1476719&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala Sun Apr 28 03:53:57 2013
@@ -22,6 +22,7 @@ import org.fusesource.hawtdispatch._
 import org.apache.activemq.leveldb.LevelDBClient
 import org.fusesource.leveldbjni.internal.Util
 import org.apache.activemq.leveldb.util.ProcessSupport._
+import java.util.zip.CRC32
 
 object FileSupport {
 
@@ -109,6 +110,32 @@ object FileSupport {
       }
     }
 
+    def crc32(limit:Long=Long.MaxValue) = {
+      val checksum =  new CRC32();
+      var remaining = limit;
+      using(new FileInputStream(self)) { in =>
+        val data = new Array[Byte](1024*4)
+        var count = in.read(data, 0, remaining.min(data.length).toInt)
+        while( count > 0 ) {
+          remaining -= count
+          checksum.update(data, 0, count);
+          count = in.read(data, 0, remaining.min(data.length).toInt)
+        }
+      }
+      checksum.getValue()
+    }
+
+    def cached_crc32 = {
+      val crc32_file = new File(self.getParentFile, self.getName+".crc32")
+      if( crc32_file.exists() && crc32_file.lastModified() > self.lastModified() ) {
+        crc32_file.readText().trim.toLong
+      } else {
+        val rc = crc32()
+        crc32_file.writeText(rc.toString)
+        rc
+      }
+    }
+
     def listFiles:Array[File] = {
       Option(self.listFiles()).getOrElse(Array())
     }

Added: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/JsonCodec.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/JsonCodec.scala?rev=1476719&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/JsonCodec.scala (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/JsonCodec.scala Sun Apr 28 03:53:57 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.util
+
+import org.codehaus.jackson.map.ObjectMapper
+import org.fusesource.hawtbuf.{ByteArrayOutputStream, Buffer}
+import java.io.InputStream
+
+/**
+ *
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object JsonCodec {
+
+  final val mapper: ObjectMapper = new ObjectMapper
+
+  def decode[T](buffer: Buffer, clazz: Class[T]): T = {
+    val original = Thread.currentThread.getContextClassLoader
+    Thread.currentThread.setContextClassLoader(this.getClass.getClassLoader)
+    try {
+      return mapper.readValue(buffer.in, clazz)
+    } finally {
+      Thread.currentThread.setContextClassLoader(original)
+    }
+  }
+
+  def decode[T](is: InputStream, clazz : Class[T]): T = {
+    var original: ClassLoader = Thread.currentThread.getContextClassLoader
+    Thread.currentThread.setContextClassLoader(this.getClass.getClassLoader)
+    try {
+      return JsonCodec.mapper.readValue(is, clazz)
+    }
+    finally {
+      Thread.currentThread.setContextClassLoader(original)
+    }
+  }
+
+
+  def encode(value: AnyRef): Buffer = {
+    var baos = new ByteArrayOutputStream
+    mapper.writeValue(baos, value)
+    return baos.toBuffer
+  }
+
+}
\ No newline at end of file

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/Log.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/Log.scala?rev=1476719&r1=1476718&r2=1476719&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/Log.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/Log.scala Sun Apr 28 03:53:57 2013
@@ -35,9 +35,6 @@ object Log {
   def apply(value:Logger):Log = new Log {
     override val log = value
   }
-
-  val exception_id_generator = new AtomicLong(System.currentTimeMillis)
-  def next_exception_id = exception_id_generator.incrementAndGet.toHexString
 }
 
 /**
@@ -47,25 +44,6 @@ trait Log {
   import Log._
   val log = LoggerFactory.getLogger(getClass.getName.stripSuffix("$"))
 
-  private def with_throwable(e:Throwable)(func: =>Unit) = {
-    if( e!=null ) {
-      val stack_ref = if( log.isDebugEnabled ) {
-        val id = next_exception_id
-        MDC.put("stackref", id.toString);
-        Some(id)
-      } else {
-        None
-      }
-      func
-      stack_ref.foreach { id=>
-        log.debug(e.toString, e)
-        MDC.remove("stackref")
-      }
-    } else {
-      func
-    }
-  }
-
   private def format(message:String, args:Seq[Any]) = {
     if( args.isEmpty ) {
       message
@@ -81,18 +59,14 @@ trait Log {
   }
 
   def error(e: Throwable, m: => String, args:Any*): Unit = {
-    with_throwable(e) {
-      if( log.isErrorEnabled ) {
-        log.error(format(m, args.toSeq))
-      }
+    if( log.isErrorEnabled ) {
+      log.error(format(m, args.toSeq), e)
     }
   }
 
   def error(e: Throwable): Unit = {
-    with_throwable(e) {
-      if( log.isErrorEnabled ) {
-        log.error(e.getMessage)
-      }
+    if( log.isErrorEnabled ) {
+      log.error(e.getMessage, e)
     }
   }
 
@@ -103,18 +77,14 @@ trait Log {
   }
 
   def warn(e: Throwable, m: => String, args:Any*): Unit = {
-    with_throwable(e) {
-      if( log.isWarnEnabled ) {
-        log.warn(format(m, args.toSeq))
-      }
+    if( log.isWarnEnabled ) {
+      log.warn(format(m, args.toSeq), e)
     }
   }
 
   def warn(e: Throwable): Unit = {
-    with_throwable(e) {
-      if( log.isWarnEnabled ) {
-        log.warn(e.toString)
-      }
+    if( log.isWarnEnabled ) {
+      log.warn(e.toString, e)
     }
   }
 
@@ -125,18 +95,14 @@ trait Log {
   }
 
   def info(e: Throwable, m: => String, args:Any*): Unit = {
-    with_throwable(e) {
-      if( log.isInfoEnabled ) {
-        log.info(format(m, args.toSeq))
-      }
+    if( log.isInfoEnabled ) {
+      log.info(format(m, args.toSeq), e)
     }
   }
 
   def info(e: Throwable): Unit = {
-    with_throwable(e) {
-      if( log.isInfoEnabled ) {
-        log.info(e.toString)
-      }
+    if( log.isInfoEnabled ) {
+      log.info(e.toString, e)
     }
   }
 



Mime
View raw message