activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1477387 - in /activemq/trunk/activemq-leveldb-store: ./ src/main/java/org/apache/activemq/leveldb/replicated/ src/main/scala/org/apache/activemq/leveldb/ src/main/scala/org/apache/activemq/leveldb/replicated/ src/test/java/org/apache/activ...
Date Mon, 29 Apr 2013 22:33:26 GMT
Author: chirino
Date: Mon Apr 29 22:33:26 2013
New Revision: 1477387

URL: http://svn.apache.org/r1477387
Log:
Added an new ElectingLevelDBStore which handles the M/S election bits using ZooKeeper.

Added:
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
    activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
Modified:
    activemq/trunk/activemq-leveldb-store/pom.xml
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java

Modified: activemq/trunk/activemq-leveldb-store/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/pom.xml?rev=1477387&r1=1477386&r2=1477387&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/pom.xml (original)
+++ activemq/trunk/activemq-leveldb-store/pom.xml Mon Apr 29 22:33:26 2013
@@ -124,6 +124,18 @@
       <artifactId>fabric-zookeeper</artifactId>
       <version>7.2.0.redhat-024</version>
     </dependency>
+    <dependency>
+      <groupId>org.osgi</groupId>
+      <artifactId>org.osgi.core</artifactId>
+      <version>4.3.1</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.osgi</groupId>
+      <artifactId>org.osgi.compendium</artifactId>
+      <version>4.3.1</version>
+      <scope>provided</scope>
+    </dependency>
 
     <!-- For Optional Snappy Compression -->
     <dependency>

Added: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala?rev=1477387&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala Mon Apr 29 22:33:26 2013
@@ -0,0 +1,44 @@
+package org.apache.activemq.leveldb.replicated
+
+import scala.reflect.BeanProperty
+import java.util.UUID
+import org.apache.activemq.leveldb.LevelDBStore
+import org.apache.activemq.leveldb.util.FileSupport._
+
+/**
+ */
+trait ReplicatedLevelDBStoreTrait extends LevelDBStore {
+
+  @BeanProperty
+  var securityToken = ""
+
+  def replicaId:String = {
+    val replicaid_file = directory / "replicaid.txt"
+    if( replicaid_file.exists() ) {
+      replicaid_file.readText()
+    } else {
+      val rc = create_uuid
+      replicaid_file.getParentFile.mkdirs()
+      replicaid_file.writeText(rc)
+      rc
+    }
+  }
+
+  def create_uuid = UUID.randomUUID().toString
+
+  def storeId:String = {
+    val storeid_file = directory / "storeid.txt"
+    if( storeid_file.exists() ) {
+      storeid_file.readText()
+    } else {
+      null
+    }
+  }
+
+  def storeId_=(value:String) {
+    val storeid_file = directory / "storeid.txt"
+    storeid_file.writeText(value)
+  }
+
+
+}

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1477387&r1=1477386&r2=1477387&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Mon Apr 29 22:33:26 2013
@@ -91,25 +91,39 @@ object UowCompleted extends UowState {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-case class CountDownFuture(completed:CountDownLatch=new CountDownLatch(1)) extends java.util.concurrent.Future[Object] {
-  def countDown = completed.countDown()
+class CountDownFuture[T <: AnyRef]() extends java.util.concurrent.Future[T] {
+
+  private val latch:CountDownLatch=new CountDownLatch(1)
+  @volatile
+  var value:T = _
+
   def cancel(mayInterruptIfRunning: Boolean) = false
   def isCancelled = false
 
+
+  def completed = latch.getCount()==0
+  def await() = latch.await()
+  def await(p1: Long, p2: TimeUnit) = latch.await(p1, p2)
+
+  def set(v:T) = {
+    value = v
+    latch.countDown()
+  }
+
   def get() = {
-    completed.await()
-    null
+    latch.await()
+    value
   }
 
   def get(p1: Long, p2: TimeUnit) = {
-    if(completed.await(p1, p2)) {
-      null
+    if(latch.await(p1, p2)) {
+      value
     } else {
       throw new TimeoutException
     }
   }
 
-  def isDone = completed.await(0, TimeUnit.SECONDS);
+  def isDone = latch.await(0, TimeUnit.SECONDS);
 }
 
 object UowManagerConstants {
@@ -125,7 +139,7 @@ object UowManagerConstants {
 import UowManagerConstants._
 
 class DelayableUOW(val manager:DBManager) extends BaseRetained {
-  val countDownFuture = CountDownFuture()
+  val countDownFuture = new CountDownFuture[AnyRef]()
   var canceled = false;
 
   val uowId:Int = manager.lastUowId.incrementAndGet()
@@ -310,7 +324,7 @@ class DelayableUOW(val manager:DBManager
       val s = size
       if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) {
         asyncCapacityUsed = s
-        countDownFuture.countDown
+        countDownFuture.set(null)
         manager.parent.blocking_executor.execute(^{
           complete_listeners.foreach(_())
         })
@@ -332,7 +346,7 @@ class DelayableUOW(val manager:DBManager
         asyncCapacityUsed = 0
       } else {
         manager.uow_complete_latency.add(System.nanoTime() - disposed_at)
-        countDownFuture.countDown
+        countDownFuture.set(null)
         manager.parent.blocking_executor.execute(^{
           complete_listeners.foreach(_())
         })

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1477387&r1=1477386&r2=1477387&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Mon Apr 29 22:33:26 2013
@@ -748,7 +748,9 @@ class LevelDBClient(store: LevelDBStore)
     loadMap(LOG_REF_INDEX_KEY, logRefs)
     loadMap(COLLECTION_META_KEY, collectionMeta)
   }
-  
+
+  var wal_append_position = 0L
+
   def stop() = {
     if( writeExecutor!=null ) {
       writeExecutor.shutdown
@@ -765,6 +767,7 @@ class LevelDBClient(store: LevelDBStore)
       if (log.isOpen) {
         log.close
         copyDirtyIndexToSnapshot
+        wal_append_position = log.appender_limit
       }
       if( plist!=null ) {
         plist.close

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1477387&r1=1477386&r2=1477387&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Mon Apr 29 22:33:26 2013
@@ -17,41 +17,24 @@
 
 package org.apache.activemq.leveldb
 
-import org.apache.activemq.broker.{LockableServiceSupport, BrokerService, BrokerServiceAware, ConnectionContext}
+import org.apache.activemq.broker.{LockableServiceSupport, BrokerServiceAware, ConnectionContext}
 import org.apache.activemq.command._
 import org.apache.activemq.openwire.OpenWireFormat
 import org.apache.activemq.usage.SystemUsage
 import java.io.File
 import java.io.IOException
 import java.util.concurrent._
-import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
+import java.util.concurrent.atomic.AtomicLong
 import reflect.BeanProperty
 import org.apache.activemq.store._
 import java.util._
 import collection.mutable.ListBuffer
-import javax.management.ObjectName
 import org.apache.activemq.broker.jmx.{BrokerMBeanSupport, AnnotatedMBean}
 import org.apache.activemq.util._
-import org.apache.activemq.leveldb.util.{RetrySupport, FileSupport, Log}
+import org.apache.activemq.leveldb.util.{RetrySupport, Log}
 import org.apache.activemq.store.PList.PListIterator
 import java.lang
-import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream, Buffer}
-import scala.Some
-import org.apache.activemq.leveldb.CountDownFuture
-import org.apache.activemq.leveldb.XaAckRecord
-import org.apache.activemq.leveldb.DurableSubscription
-import scala.Some
-import org.apache.activemq.leveldb.CountDownFuture
-import org.apache.activemq.leveldb.XaAckRecord
-import org.apache.activemq.leveldb.DurableSubscription
-import scala.Some
-import org.apache.activemq.leveldb.CountDownFuture
-import org.apache.activemq.leveldb.XaAckRecord
-import org.apache.activemq.leveldb.DurableSubscription
-import scala.Some
-import org.apache.activemq.leveldb.CountDownFuture
-import org.apache.activemq.leveldb.XaAckRecord
-import org.apache.activemq.leveldb.DurableSubscription
+import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream}
 
 object LevelDBStore extends Log {
   val DEFAULT_DIRECTORY = new File("LevelDB");
@@ -64,8 +47,8 @@ object LevelDBStore extends Log {
       }
   })
 
-  val DONE = new CountDownFuture();
-  DONE.countDown
+  val DONE = new CountDownFuture[AnyRef]();
+  DONE.set(null)
   
   def toIOException(e: Throwable): IOException = {
     if (e.isInstanceOf[ExecutionException]) {
@@ -208,7 +191,6 @@ class LevelDBStore extends LockableServi
   var snappyCompressLogs = false
 
   def doStart: Unit = {
-    import FileSupport._
 
     snappyCompressLogs = logCompression.toLowerCase == "snappy" && Snappy != null
     debug("starting")
@@ -583,7 +565,7 @@ class LevelDBStore extends LockableServi
 
     lastSeq.set(db.getLastQueueEntrySeq(key))
 
-    def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture = {
+    def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
       uow.enqueue(key, lastSeq.incrementAndGet, message, delay)
     }
 
@@ -606,7 +588,7 @@ class LevelDBStore extends LockableServi
       waitOn(asyncAddQueueMessage(context, message, delay))
     }
 
-    def doRemove(uow: DelayableUOW, id: MessageId): CountDownFuture = {
+    def doRemove(uow: DelayableUOW, id: MessageId): CountDownFuture[AnyRef] = {
       uow.dequeue(key, id)
     }
 

Added: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala?rev=1477387&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala Mon Apr 29 22:33:26 2013
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.replicated
+
+import org.fusesource.fabric.groups._
+import org.fusesource.fabric.zookeeper.internal.ZKClient
+import org.linkedin.util.clock.Timespan
+import scala.reflect.BeanProperty
+import org.apache.activemq.util.{ServiceStopper, ServiceSupport}
+import org.apache.activemq.leveldb.{LevelDBClient, RecordLog, LevelDBStore}
+import java.net.{NetworkInterface, InetAddress}
+import org.fusesource.hawtdispatch._
+import org.apache.activemq.broker.Locker
+import org.apache.activemq.store.PersistenceAdapter
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.activemq.leveldb.util.Log
+import java.io.File
+
+object ElectingLevelDBStore extends Log {
+
+  def machine_hostname: String = {
+    import collection.JavaConversions._
+    // Get the host name of the first non loop-back interface..
+    for (interface <- NetworkInterface.getNetworkInterfaces; if (!interface.isLoopback); inet <- interface.getInetAddresses) {
+      var address = inet.getHostAddress
+      var name = inet.getCanonicalHostName
+      if( address!= name ) {
+        return name
+      }
+    }
+    // Or else just go the simple route.
+    return InetAddress.getLocalHost.getCanonicalHostName;
+  }
+
+}
+
+/**
+ *
+ */
+class ElectingLevelDBStore extends ProxyLevelDBStore {
+  import ElectingLevelDBStore._
+
+  def proxy_target = master
+
+  @BeanProperty
+  var zkAddress = "tcp://127.0.0.1:2888"
+  @BeanProperty
+  var zkPassword:String = _
+  @BeanProperty
+  var zkPath = "/default"
+  @BeanProperty
+  var zkSessionTmeout = "2s"
+
+  var brokerName: String = _
+
+  @BeanProperty
+  var hostname: String = _
+  @BeanProperty
+  var bind = "tcp://0.0.0.0:61619"
+  @BeanProperty
+  var minReplica = 1
+  @BeanProperty
+  var securityToken = ""
+
+  var directory = LevelDBStore.DEFAULT_DIRECTORY;
+  override def setDirectory(dir: File) {
+    directory = dir
+  }
+  override def getDirectory: File = {
+    return directory
+  }
+
+  @BeanProperty
+  var logSize: Long = 1024 * 1024 * 100
+  @BeanProperty
+  var indexFactory: String = "org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory"
+  @BeanProperty
+  var sync: Boolean = true
+  @BeanProperty
+  var verifyChecksums: Boolean = false
+  @BeanProperty
+  var indexMaxOpenFiles: Int = 1000
+  @BeanProperty
+  var indexBlockRestartInterval: Int = 16
+  @BeanProperty
+  var paranoidChecks: Boolean = false
+  @BeanProperty
+  var indexWriteBufferSize: Int = 1024 * 1024 * 6
+  @BeanProperty
+  var indexBlockSize: Int = 4 * 1024
+  @BeanProperty
+  var indexCompression: String = "snappy"
+  @BeanProperty
+  var logCompression: String = "none"
+  @BeanProperty
+  var indexCacheSize: Long = 1024 * 1024 * 256L
+  @BeanProperty
+  var flushDelay = 1000 * 5
+  @BeanProperty
+  var asyncBufferSize = 1024 * 1024 * 4
+  @BeanProperty
+  var monitorStats = false
+
+  def cluster_size_quorum = minReplica + 1
+
+  def cluster_size_max = (minReplica << 2) + 1
+
+  var master: MasterLevelDBStore = _
+  var slave: SlaveLevelDBStore = _
+
+  var zk_client: ZKClient = _
+  var zk_group: Group = _
+  var master_elector: MasterElector = _
+
+  var position: Long = -1L
+
+  def init() {
+
+    // Figure out our position in the store.
+    directory.mkdirs()
+    val log = new RecordLog(directory, LevelDBClient.LOG_SUFFIX)
+    log.logSize = logSize
+    log.open()
+    position = try {
+      log.current_appender.append_position
+    } finally {
+      log.close
+    }
+
+    zk_client = new ZKClient(zkAddress, Timespan.parse(zkSessionTmeout), null)
+    if( zkPassword!=null ) {
+      zk_client.setPassword(zkPassword)
+    }
+    zk_client.start
+    zk_client.waitForConnected(Timespan.parse("30s"))
+
+    val zk_group = ZooKeeperGroupFactory.create(zk_client, zkPath)
+    val master_elector = new MasterElector(this)
+    master_elector.start(zk_group)
+    master_elector.join
+
+    this.setUseLock(true)
+    this.setLocker(createDefaultLocker())
+  }
+
+  def createDefaultLocker(): Locker = new Locker {
+
+    def configure(persistenceAdapter: PersistenceAdapter) {}
+    def setFailIfLocked(failIfLocked: Boolean) {}
+    def setLockAcquireSleepInterval(lockAcquireSleepInterval: Long) {}
+    def setName(name: String) {}
+
+    def start()  = {
+      master_started_latch.await()
+    }
+
+    def keepAlive(): Boolean = {
+      master_started.get()
+    }
+
+    def stop() {}
+  }
+
+
+  val master_started_latch = new CountDownLatch(1)
+  val master_started = new AtomicBoolean(false)
+
+  def start_master(func: (Int) => Unit) = {
+    assert(master==null)
+    master = create_master()
+    master.blocking_executor.execute(^{
+      master_started.set(true)
+      master.start();
+      master_started_latch.countDown()
+      func(master.getPort)
+    })
+  }
+
+  def isMaster = master_started.get() && !master_stopped.get()
+
+  val stopped_latch = new CountDownLatch(1)
+  val master_stopped = new AtomicBoolean(false)
+
+  def stop_master(func: => Unit) = {
+    assert(master!=null)
+    master.blocking_executor.execute(^{
+      master.stop();
+      master_stopped.set(true)
+      position = master.wal_append_position
+      stopped_latch.countDown()
+      func
+    })
+  }
+
+  protected def doStart() = {
+    master_started_latch.await()
+  }
+
+  protected def doStop(stopper: ServiceStopper) {
+    zk_client.close()
+    zk_client = null
+    if( master_started.get() ) {
+      stopped_latch.countDown()
+    }
+  }
+
+  def start_slave(address: String)(func: => Unit) = {
+    assert(master==null)
+    slave = create_slave()
+    slave.connect = address
+    slave.blocking_executor.execute(^{
+      slave.start();
+      func
+    })
+  }
+
+  def stop_slave(func: => Unit) = {
+    if( slave!=null ) {
+      val s = slave
+      slave = null
+      s.blocking_executor.execute(^{
+        s.stop();
+        position = s.wal_append_position
+        func
+      })
+    }
+  }
+
+  def create_slave() = {
+    val slave = new SlaveLevelDBStore();
+    configure(slave)
+    slave
+  }
+
+  def create_master() = {
+    val master = new MasterLevelDBStore
+    configure(master)
+    master.minReplica = minReplica
+    master.bind = bind
+    master
+  }
+
+  override def setBrokerName(brokerName: String): Unit = {
+    this.brokerName = brokerName
+  }
+
+  def configure(store: ReplicatedLevelDBStoreTrait) {
+    store.directory = directory
+    store.indexFactory = indexFactory
+    store.sync = sync
+    store.verifyChecksums = verifyChecksums
+    store.indexMaxOpenFiles = indexMaxOpenFiles
+    store.indexBlockRestartInterval = indexBlockRestartInterval
+    store.paranoidChecks = paranoidChecks
+    store.indexWriteBufferSize = indexWriteBufferSize
+    store.indexBlockSize = indexBlockSize
+    store.indexCompression = indexCompression
+    store.logCompression = logCompression
+    store.indexCacheSize = indexCacheSize
+    store.flushDelay = flushDelay
+    store.asyncBufferSize = asyncBufferSize
+    store.monitorStats = monitorStats
+    store.securityToken = securityToken
+    store.setBrokerName(brokerName)
+    store.setBrokerService(brokerService)
+  }
+
+  def address(port: Int) = {
+    if (hostname == null) {
+      hostname = machine_hostname
+    }
+    "tcp://" + hostname + ":" + port
+  }
+
+}

Added: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala?rev=1477387&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala Mon Apr 29 22:33:26 2013
@@ -0,0 +1,205 @@
+package org.apache.activemq.leveldb.replicated
+
+import org.fusesource.fabric.groups._
+import org.codehaus.jackson.annotate.JsonProperty
+import org.apache.activemq.leveldb.util.{Log, JsonCodec}
+
+
+class LevelDBNodeState extends NodeState {
+
+  @JsonProperty
+  var id: String = _
+
+  @JsonProperty
+  var address: String = _
+
+  @JsonProperty
+  var position: Long = -1
+
+  @JsonProperty
+  var elected: String = _
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case x:LevelDBNodeState =>
+        x.id == id &&
+        x.address == address &&
+        x.position == position &&
+        x.elected == elected
+      case _ => false
+    }
+  }
+
+  override
+  def toString = JsonCodec.encode(this).ascii().toString
+
+}
+
+object MasterElector extends Log
+
+/**
+ */
+class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[LevelDBNodeState](classOf[LevelDBNodeState]) {
+  
+  import MasterElector._
+
+  var last_state: LevelDBNodeState = _
+  var elected: String = _
+  var position: Long = -1
+  var address: String = _
+  var updating_store = false
+  var next_connect: String = _
+  var connected_address: String = _
+
+  def join: Unit = this.synchronized {
+    last_state = create_state
+    join(last_state)
+    add(changle_listener)
+  }
+
+  def elector  = this
+
+  def update: Unit = elector.synchronized {
+    var next = create_state
+    if (next != last_state) {
+      last_state = next
+      update(next)
+    }
+  }
+
+  def create_state = {
+    val rc = new LevelDBNodeState
+    rc.id = store.brokerName
+    rc.elected = elected
+    rc.position = position
+    rc.address = address
+    rc
+  }
+
+  object changle_listener extends ChangeListener {
+
+    def connected = changed
+    def disconnected = changed
+
+    def changed:Unit = elector.synchronized {
+      // info(eid+" cluster state changed: "+members)
+      if (isMaster) {
+        // We are the master elector, we will choose which node will startup the MasterLevelDBStore
+        members.get(store.brokerName) match {
+          case None =>
+            info("Not enough cluster members connected to elect a new master.")
+          case Some(members) =>
+
+            if (members.size < store.cluster_size_quorum) {
+              info("Not enough cluster members connected to elect a new master.")
+            } else {
+
+              // If we already elected a master, lets make sure he is still online..
+              if (elected != null) {
+                val by_eid = Map(members: _*)
+                if (by_eid.get(elected).isEmpty) {
+                  info("Previously elected master is not online, staring new election")
+                  elected = null
+                }
+              }
+
+              // Do we need to elect a new master?
+              if (elected == null) {
+                // Find the member with the most updates.
+                val sortedMembers = members.filter(_._2.position >= 0).sortWith {
+                  (a, b) => a._2.position > b._2.position
+                }
+                if (sortedMembers.size != members.size) {
+                  info("Not enough cluster members have reported their update positions yet.")
+                } else {
+                  // We now have an election.
+                  elected = sortedMembers.head._1
+                }
+              }
+              // Sort by the positions in the cluster..
+            }
+        }
+      } else {
+        // Only the master sets the elected field.
+        elected = null
+      }
+
+      val master_elected = master.map(_.elected).getOrElse(null) 
+
+      // If no master is currently elected, we need to report our current store position.
+      // Since that will be used to select the master.
+      val connect_target = if (master_elected != null) {
+        position = -1
+        members.get(store.brokerName).get.find(_._1 == master_elected).map(_._2.address).getOrElse(null)
+      } else {
+        // Once we are not running a master or server, report the position..
+        if( connected_address==null && address==null && !updating_store ) {
+          position = store.position
+        }
+        null
+      }
+
+      // Do we need to stop the running master?
+      if (master_elected != eid && address != null && !updating_store) {
+        info("Demoted to slave")
+        updating_store = true
+        store.stop_master {
+          elector.synchronized {
+            info("Master stopped")
+            address = null
+            changed
+          }
+        }
+      }
+
+      // Have we been promoted to being the master?
+      if (master_elected == eid && address==null && !updating_store ) {
+        info("Promoted to master")
+        updating_store = true
+        store.start_master { port =>
+          elector.synchronized {
+            updating_store = false
+            address = store.address(port)
+            info("Master started: "+address)
+            changed
+          }
+        }
+      }
+
+      // Can we become a slave?
+      if (master_elected != eid && address == null) {
+        // Did the master address change?
+        if (connect_target != connected_address) {
+
+          // Do we need to setup a new slave.
+          if (connect_target != null && !updating_store) {
+            updating_store = true
+            store.start_slave(connect_target) {
+              elector.synchronized {
+                updating_store=false
+                info("Slave started")
+                connected_address = connect_target
+                changed
+              }
+            }
+          }
+
+          // Lets stop the slave..
+          if (connect_target == null && !updating_store) {
+            updating_store = true
+            store.stop_slave {
+              elector.synchronized {
+                updating_store=false
+                info("Slave stopped")
+                connected_address = null
+                changed
+              }
+            }
+          }
+        }
+      }
+
+      update
+    }
+  }
+}

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala?rev=1477387&r1=1477386&r2=1477387&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala Mon Apr 29 22:33:26 2013
@@ -28,7 +28,6 @@ import java.io.{IOException, File}
 import java.net.{InetSocketAddress, URI}
 import java.util.concurrent.atomic.AtomicLong
 import scala.reflect.BeanProperty
-import java.util.UUID
 
 class PositionSync(val position:Long, count:Int) extends CountDownLatch(count)
 
@@ -36,7 +35,7 @@ object MasterLevelDBStore extends Log
 
 /**
  */
-class MasterLevelDBStore extends LevelDBStore {
+class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
 
   import MasterLevelDBStore._
   import collection.JavaConversions._
@@ -45,24 +44,10 @@ class MasterLevelDBStore extends LevelDB
   @BeanProperty
   var bind = "tcp://0.0.0.0:61619"
   @BeanProperty
-  var securityToken = ""
-  @BeanProperty
   var minReplica = 1
 
   val slaves = new ConcurrentHashMap[String,SlaveState]()
 
-  def replicaId:String = {
-    val replicaid_file = directory / "replicaid.txt"
-    if( replicaid_file.exists() ) {
-      replicaid_file.readText()
-    } else {
-      val rc = UUID.randomUUID().toString
-      replicaid_file.getParentFile.mkdirs()
-      replicaid_file.writeText(rc)
-      rc
-    }
-  }
-
   override def doStart = {
     super.doStart
     start_protocol_server
@@ -79,7 +64,6 @@ class MasterLevelDBStore extends LevelDB
   override def createClient = new MasterLevelDBClient(this)
   def master_client = client.asInstanceOf[MasterLevelDBClient]
 
-
   //////////////////////////////////////
   // Replication Protocol Stuff
   //////////////////////////////////////
@@ -112,20 +96,6 @@ class MasterLevelDBStore extends LevelDB
     transport_server.stop(NOOP)
   }
 
-
-  case class HawtCallback[T](cb:(T)=>Unit) extends Function1[T, Unit] {
-    val queue = getCurrentQueue
-    def apply(value:T) = {
-      if( queue==null || queue.isExecuting ) {
-        cb(value)
-      } else {
-        queue {
-          cb(value)
-        }
-      }
-    }
-  }
-
   class Session(transport: Transport) extends TransportHandler(transport) {
 
     var login:Login = _
@@ -347,4 +317,6 @@ class MasterLevelDBStore extends LevelDB
     }
   }
 
+  def wal_append_position = client.wal_append_position
+
 }

Added: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala?rev=1477387&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala Mon Apr 29 22:33:26 2013
@@ -0,0 +1,114 @@
+package org.apache.activemq.leveldb.replicated
+
+import org.apache.activemq.broker.{LockableServiceSupport, BrokerService, BrokerServiceAware, ConnectionContext}
+import org.apache.activemq.command._
+import org.apache.activemq.leveldb.LevelDBStore
+import org.apache.activemq.store._
+import org.apache.activemq.usage.SystemUsage
+import java.io.File
+import java.io.IOException
+import java.util.Set
+import org.apache.activemq.util.{ServiceStopper, ServiceSupport}
+
+/**
+ */
+abstract class ProxyLevelDBStore extends LockableServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore with PListStore {
+
+  def proxy_target: LevelDBStore
+
+  def beginTransaction(context: ConnectionContext) {
+    proxy_target.beginTransaction(context)
+  }
+
+  def getLastProducerSequenceId(id: ProducerId): Long = {
+    return proxy_target.getLastProducerSequenceId(id)
+  }
+
+  def createTopicMessageStore(destination: ActiveMQTopic): TopicMessageStore = {
+    return proxy_target.createTopicMessageStore(destination)
+  }
+
+  def setDirectory(dir: File) {
+    proxy_target.setDirectory(dir)
+  }
+
+  def checkpoint(sync: Boolean) {
+    proxy_target.checkpoint(sync)
+  }
+
+  def createTransactionStore: TransactionStore = {
+    return proxy_target.createTransactionStore
+  }
+
+  def setUsageManager(usageManager: SystemUsage) {
+    proxy_target.setUsageManager(usageManager)
+  }
+
+  def commitTransaction(context: ConnectionContext) {
+    proxy_target.commitTransaction(context)
+  }
+
+  def getLastMessageBrokerSequenceId: Long = {
+    return proxy_target.getLastMessageBrokerSequenceId
+  }
+
+  def setBrokerName(brokerName: String) {
+    proxy_target.setBrokerName(brokerName)
+  }
+
+  def rollbackTransaction(context: ConnectionContext) {
+    proxy_target.rollbackTransaction(context)
+  }
+
+  def removeTopicMessageStore(destination: ActiveMQTopic) {
+    proxy_target.removeTopicMessageStore(destination)
+  }
+
+  def getDirectory: File = {
+    return proxy_target.getDirectory
+  }
+
+  def size: Long = {
+    return proxy_target.size
+  }
+
+  def removeQueueMessageStore(destination: ActiveMQQueue) {
+    proxy_target.removeQueueMessageStore(destination)
+  }
+
+  def createQueueMessageStore(destination: ActiveMQQueue): MessageStore = {
+    return proxy_target.createQueueMessageStore(destination)
+  }
+
+  def deleteAllMessages {
+    proxy_target.deleteAllMessages
+  }
+
+  def getDestinations: Set[ActiveMQDestination] = {
+    return proxy_target.getDestinations
+  }
+
+  def rollback(txid: TransactionId) {
+    proxy_target.rollback(txid)
+  }
+
+  def recover(listener: TransactionRecoveryListener) {
+    proxy_target.recover(listener)
+  }
+
+  def prepare(txid: TransactionId) {
+    proxy_target.prepare(txid)
+  }
+
+  def commit(txid: TransactionId, wasPrepared: Boolean, preCommit: Runnable, postCommit: Runnable) {
+    proxy_target.commit(txid, wasPrepared, preCommit, postCommit)
+  }
+
+  def getPList(name: String): PList = {
+    return proxy_target.getPList(name)
+  }
+
+  def removePList(name: String): Boolean = {
+    return proxy_target.removePList(name)
+  }
+}
\ No newline at end of file

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala?rev=1477387&r1=1477386&r2=1477387&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala Mon Apr 29 22:33:26 2013
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.leveldb.replicated
 
-import org.apache.activemq.leveldb.{LevelDBClient, LevelDBStore}
+import org.apache.activemq.leveldb.LevelDBStore
 import org.apache.activemq.util.ServiceStopper
 import java.util
 import org.fusesource.hawtdispatch._
@@ -29,13 +29,12 @@ import org.apache.activemq.leveldb.util.
 import FileSupport._
 import java.io.{IOException, RandomAccessFile, File}
 import scala.reflect.BeanProperty
-import java.util.UUID
 
 object SlaveLevelDBStore extends Log
 
 /**
  */
-class SlaveLevelDBStore extends LevelDBStore {
+class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
 
   import SlaveLevelDBStore._
   import ReplicationSupport._
@@ -43,34 +42,14 @@ class SlaveLevelDBStore extends LevelDBS
 
   @BeanProperty
   var connect = "tcp://0.0.0.0:61619"
-  @BeanProperty
-  var securityToken = ""
 
   val queue = createQueue("leveldb replication slave")
   var replay_from = 0L
   var caughtUp = false
 
-  override def createClient = new SlaveLevelDBClient(this)
-  def slave_client = client.asInstanceOf[SlaveLevelDBClient]
-
-  class SlaveLevelDBClient(val store:SlaveLevelDBStore) extends LevelDBClient(store) {
-  }
-
   var wal_session:Session = _
   var transfer_session:Session = _
 
-  def replicaId:String = {
-    val replicaid_file = directory / "replicaid.txt"
-    if( replicaid_file.exists() ) {
-      replicaid_file.readText()
-    } else {
-      val rc = UUID.randomUUID().toString
-      replicaid_file.getParentFile.mkdirs()
-      replicaid_file.writeText(rc)
-      rc
-    }
-  }
-
   override def doStart() = {
     client.init()
 

Added: activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java?rev=1477387&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java (added)
+++ activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java Mon Apr 29 22:33:26 2013
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.test;
+
+import junit.framework.TestCase;
+import org.apache.activemq.Service;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.leveldb.CountDownFuture;
+import org.apache.activemq.leveldb.LevelDBStore;
+import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore;
+import org.apache.activemq.leveldb.util.FileSupport;
+import org.apache.activemq.store.MessageStore;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import static org.apache.activemq.leveldb.test.ReplicationTestSupport.*;
+
+/**
+ */
+public class ElectingLevelDBStoreTest extends TestCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(ElectingLevelDBStoreTest.class);
+
+    NIOServerCnxnFactory connector;
+
+    static File data_dir() {
+        return new File("target/activemq-data/leveldb-elections");
+    }
+
+
+    @Override
+    protected void setUp() throws Exception {
+        FileSupport.toRichFile(data_dir()).recursiveDelete();
+
+        System.out.println("Starting ZooKeeper");
+        ZooKeeperServer zk_server = new ZooKeeperServer();
+        zk_server.setTickTime(500);
+        zk_server.setTxnLogFactory(new FileTxnSnapLog(new File(data_dir(), "zk-log"), new File(data_dir(), "zk-data")));
+        connector = new NIOServerCnxnFactory();
+        connector.configure(new InetSocketAddress(0), 100);
+        connector.startup(zk_server);
+        System.out.println("ZooKeeper Started");
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        if( connector!=null ) {
+          connector.shutdown();
+          connector = null;
+        }
+    }
+
+    public void testElection() throws Exception {
+
+        ArrayList<ElectingLevelDBStore> stores = new ArrayList<ElectingLevelDBStore>();
+        ArrayList<CountDownFuture> pending_starts = new ArrayList<CountDownFuture>();
+
+        for(String dir: new String[]{"leveldb-node1", "leveldb-node2", "leveldb-node3"}) {
+            ElectingLevelDBStore store = createStoreNode();
+            store.setDirectory(new File(data_dir(), dir));
+            stores.add(store);
+            pending_starts.add(asyncStart(store));
+        }
+
+        // At least one of the stores should have started.
+        CountDownFuture f = waitFor(30 * 1000, pending_starts.toArray(new CountDownFuture[pending_starts.size()]));
+        assertTrue(f!=null);
+        pending_starts.remove(f);
+
+        // The other stores should not start..
+        LOG.info("Making sure the other stores don't start");
+        Thread.sleep(5000);
+        for(CountDownFuture start: pending_starts) {
+            assertFalse(start.completed());
+        }
+
+        // Make sure only of the stores is reporting to be the master.
+        ElectingLevelDBStore master = null;
+        for(ElectingLevelDBStore store: stores) {
+            if( store.isMaster() ) {
+                assertNull(master);
+                master = store;
+            }
+        }
+        assertNotNull(master);
+
+        // We can work out who the slaves are...
+        HashSet<ElectingLevelDBStore> slaves = new HashSet<ElectingLevelDBStore>(stores);
+        slaves.remove(master);
+
+        // Start sending messages to the master.
+        ArrayList<String> expected_list = new ArrayList<String>();
+        MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
+        final int TOTAL = 500;
+        for (int i = 0; i < TOTAL; i++) {
+            if (i % ((int) (TOTAL * 0.10)) == 0) {
+                LOG.info("" + (100 * i / TOTAL) + "% done");
+            }
+
+            if( i == 250 ) {
+
+                LOG.info("Checking master state");
+                assertEquals(expected_list, getMessages(ms));
+
+                // mid way, lets kill the master..
+                LOG.info("Killing Master.");
+                master.stop();
+
+                // At least one of the remaining stores should complete starting.
+                LOG.info("Waiting for slave takeover...");
+                f = waitFor(60 * 1000, pending_starts.toArray(new CountDownFuture[pending_starts.size()]));
+                assertTrue(f!=null);
+                pending_starts.remove(f);
+
+                // Make sure one and only one of the slaves becomes the master..
+                master = null;
+                for(ElectingLevelDBStore store: slaves) {
+                    if( store.isMaster() ) {
+                        assertNull(master);
+                        master = store;
+                    }
+                }
+
+                assertNotNull(master);
+                slaves.remove(master);
+
+                ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
+            }
+
+            String msgid = "m:" + i;
+            addMessage(ms, msgid);
+            expected_list.add(msgid);
+        }
+
+        LOG.info("Checking master state");
+        assertEquals(expected_list, getMessages(ms));
+
+        master.stop();
+        for(ElectingLevelDBStore store: stores) {
+            store.stop();
+        }
+    }
+
+    private CountDownFuture waitFor(int timeout, CountDownFuture... futures) throws InterruptedException {
+        long deadline =  System.currentTimeMillis()+timeout;
+        while( true ) {
+            for (CountDownFuture f:futures) {
+                if( f.await(1, TimeUnit.MILLISECONDS) ) {
+                    return f;
+                }
+            }
+            long remaining = deadline - System.currentTimeMillis();
+            if( remaining < 0 ) {
+                return null;
+            } else {
+                Thread.sleep(Math.min(remaining / 10, 100L));
+            }
+        }
+    }
+
+    private CountDownFuture asyncStart(final Service service) {
+        final CountDownFuture<Throwable> f = new CountDownFuture<Throwable>();
+        LevelDBStore.BLOCKING_EXECUTOR().execute(new Runnable() {
+            public void run() {
+                try {
+                    service.start();
+                    f.set(null);
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                    f.set(e);
+                }
+            }
+        });
+        return f;
+    }
+
+    private CountDownFuture asyncStop(final Service service) {
+        final CountDownFuture<Throwable> f = new CountDownFuture<Throwable>();
+        LevelDBStore.BLOCKING_EXECUTOR().execute(new Runnable() {
+            public void run() {
+                try {
+                    service.stop();
+                    f.set(null);
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                    f.set(e);
+                }
+            }
+        });
+        return f;
+    }
+
+    private ElectingLevelDBStore createStoreNode() {
+        ElectingLevelDBStore store = new ElectingLevelDBStore();
+        store.setSecurityToken("foo");
+        store.setLogSize(1023 * 200);
+        store.setMinReplica(1);
+        store.setZkAddress("localhost:" + connector.getLocalPort());
+        store.setZkPath("/broker-stores");
+        store.setBrokerName("foo");
+        store.setBind("tcp://0.0.0.0:0");
+        return store;
+    }
+
+}

Modified: activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java?rev=1477387&r1=1477386&r2=1477387&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java Mon Apr 29 22:33:26 2013
@@ -37,8 +37,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import static org.apache.activemq.leveldb.test.ReplicationTestSupport.*;
 
 /**
  */
@@ -61,29 +61,29 @@ public class ReplicatedLevelDBStoreTest 
         // Updating the store should not complete since we don't have enough
         // replicas.
         CountDownFuture f = asyncAddMessage(ms, "m1");
-        assertFalse(f.completed().await(2, TimeUnit.SECONDS));
+        assertFalse(f.await(2, TimeUnit.SECONDS));
 
         // Adding a slave should allow that update to complete.
         SlaveLevelDBStore slave = createSlave(master, slaveDir);
         slave.start();
 
-        assertTrue(f.completed().await(2, TimeUnit.SECONDS));
+        assertTrue(f.await(2, TimeUnit.SECONDS));
 
         // New updates should complete quickly now..
         f = asyncAddMessage(ms, "m2");
-        assertTrue(f.completed().await(1, TimeUnit.SECONDS));
+        assertTrue(f.await(1, TimeUnit.SECONDS));
 
         // If the slave goes offline, then updates should once again
         // not complete.
         slave.stop();
 
         f = asyncAddMessage(ms, "m3");
-        assertFalse(f.completed().await(2, TimeUnit.SECONDS));
+        assertFalse(f.await(2, TimeUnit.SECONDS));
 
         // Restart and the op should complete.
         slave = createSlave(master, slaveDir);
         slave.start();
-        assertTrue(f.completed().await(2, TimeUnit.SECONDS));
+        assertTrue(f.await(2, TimeUnit.SECONDS));
 
         master.stop();
         slave.stop();
@@ -91,15 +91,14 @@ public class ReplicatedLevelDBStoreTest 
     }
 
     private CountDownFuture asyncAddMessage(final MessageStore ms, final String body) {
-        final CountDownFuture f = new CountDownFuture(new CountDownLatch(1));
+        final CountDownFuture<Throwable> f = new CountDownFuture<Throwable>();
         LevelDBStore.BLOCKING_EXECUTOR().execute(new Runnable() {
             public void run() {
                 try {
                     addMessage(ms, body);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                } finally {
-                    f.countDown();
+                    f.set(null);
+                } catch (Throwable e) {
+                    f.set(e);
                 }
             }
         });
@@ -114,13 +113,13 @@ public class ReplicatedLevelDBStoreTest 
         directories.add(new File("target/activemq-data/leveldb-node2"));
         directories.add(new File("target/activemq-data/leveldb-node3"));
 
-        for( File f: directories) {
+        for (File f : directories) {
             FileSupport.toRichFile(f).recursiveDelete();
         }
 
         ArrayList<String> expected_list = new ArrayList<String>();
         // We will rotate between 3 nodes the task of being the master.
-        for( int j=0; j < 10; j++) {
+        for (int j = 0; j < 10; j++) {
 
             MasterLevelDBStore master = createMaster(directories.get(0));
             master.start();
@@ -132,11 +131,11 @@ public class ReplicatedLevelDBStoreTest 
             MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
             final int TOTAL = 500;
             for (int i = 0; i < TOTAL; i++) {
-                if (  i % ((int) (TOTAL * 0.10)) == 0) {
-                    LOG.info("" + (100*i/TOTAL) + "% done");
+                if (i % ((int) (TOTAL * 0.10)) == 0) {
+                    LOG.info("" + (100 * i / TOTAL) + "% done");
                 }
 
-                if( i == 250 ) {
+                if (i == 250) {
                     slave1.start();
                     slave2.stop();
                 }
@@ -149,9 +148,9 @@ public class ReplicatedLevelDBStoreTest 
             LOG.info("Checking master state");
             assertEquals(expected_list, getMessages(ms));
 
-            LOG.info("Stopping master: "+master.replicaId());
+            LOG.info("Stopping master: " + master.replicaId());
             master.stop();
-            LOG.info("Stopping slave: "+slave1.replicaId());
+            LOG.info("Stopping slave: " + slave1.replicaId());
             slave1.stop();
 
             // Rotate the dir order so that slave1 becomes the master next.
@@ -164,7 +163,7 @@ public class ReplicatedLevelDBStoreTest 
         slave1.setDirectory(directory);
         slave1.setConnect("tcp://127.0.0.1:" + master.getPort());
         slave1.setSecurityToken("foo");
-        slave1.setLogSize(1023*200);
+        slave1.setLogSize(1023 * 200);
         return slave1;
     }
 
@@ -178,49 +177,5 @@ public class ReplicatedLevelDBStoreTest 
         return master;
     }
 
-    long id_counter = 0L;
-    String payload = "";
-    {
-        for (int i = 0; i < 1024; i++) {
-            payload += "x";
-        }
-    }
-
-    public ActiveMQTextMessage addMessage(MessageStore ms, String body) throws JMSException, IOException {
-        ActiveMQTextMessage message = new ActiveMQTextMessage();
-        message.setPersistent(true);
-        message.setResponseRequired(true);
-        message.setStringProperty("id", body);
-        message.setText(payload);
-        id_counter += 1;
-        MessageId messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:" + id_counter);
-        messageId.setBrokerSequenceId(id_counter);
-        message.setMessageId(messageId);
-        ms.addMessage(new ConnectionContext(), message);
-        return message;
-    }
-
-    public ArrayList<String> getMessages(MessageStore ms) throws Exception {
-        final ArrayList<String> rc = new ArrayList<String>();
-        ms.recover(new MessageRecoveryListener() {
-            public boolean recoverMessage(Message message) throws Exception {
-                rc.add(((ActiveMQTextMessage) message).getStringProperty("id"));
-                return true;
-            }
-
-            public boolean hasSpace() {
-                return true;
-            }
-
-            public boolean recoverMessageReference(MessageId ref) throws Exception {
-                return true;
-            }
-
-            public boolean isDuplicate(MessageId ref) {
-                return false;
-            }
-        });
-        return rc;
-    }
 
 }

Added: activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java?rev=1477387&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java (added)
+++ activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java Mon Apr 29 22:33:26 2013
@@ -0,0 +1,62 @@
+package org.apache.activemq.leveldb.test;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+
+import javax.jms.JMSException;
+import java.io.IOException;
+import java.util.ArrayList;
+
+/**
+ */
+public class ReplicationTestSupport {
+
+    static long id_counter = 0L;
+    static String payload = "";
+    {
+        for (int i = 0; i < 1024; i++) {
+            payload += "x";
+        }
+    }
+
+    static public ActiveMQTextMessage addMessage(MessageStore ms, String body) throws JMSException, IOException {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setPersistent(true);
+        message.setResponseRequired(true);
+        message.setStringProperty("id", body);
+        message.setText(payload);
+        id_counter += 1;
+        MessageId messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:" + id_counter);
+        messageId.setBrokerSequenceId(id_counter);
+        message.setMessageId(messageId);
+        ms.addMessage(new ConnectionContext(), message);
+        return message;
+    }
+
+    static public ArrayList<String> getMessages(MessageStore ms) throws Exception {
+        final ArrayList<String> rc = new ArrayList<String>();
+        ms.recover(new MessageRecoveryListener() {
+            public boolean recoverMessage(Message message) throws Exception {
+                rc.add(((ActiveMQTextMessage) message).getStringProperty("id"));
+                return true;
+            }
+
+            public boolean hasSpace() {
+                return true;
+            }
+
+            public boolean recoverMessageReference(MessageId ref) throws Exception {
+                return true;
+            }
+
+            public boolean isDuplicate(MessageId ref) {
+                return false;
+            }
+        });
+        return rc;
+    }
+}



Mime
View raw message