activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject git commit: Fixing bug which caused replicated leveldb nodes to not recover from a ZooKeeper failure.
Date Wed, 09 Oct 2013 14:56:39 GMT
Updated Branches:
  refs/heads/trunk 9ee65d321 -> 5e63ddd33


Fixing bug which caused replicated leveldb nodes to not recover from a ZooKeeper failure.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5e63ddd3
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5e63ddd3
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5e63ddd3

Branch: refs/heads/trunk
Commit: 5e63ddd337cd1df4c23c052a0c9c117a4e4e5a76
Parents: 9ee65d3
Author: Hiram Chirino <hiram@hiramchirino.com>
Authored: Wed Oct 9 10:56:20 2013 -0400
Committer: Hiram Chirino <hiram@hiramchirino.com>
Committed: Wed Oct 9 10:56:34 2013 -0400

----------------------------------------------------------------------
 .../replicated/ElectingLevelDBStore.scala       |  26 ++-
 .../leveldb/replicated/MasterElector.scala      |  17 +-
 .../replicated/groups/ChangeListener.scala      | 107 +++++++++++
 .../replicated/groups/ClusteredSingleton.scala  |  57 +++---
 .../leveldb/replicated/groups/Group.scala       | 109 -----------
 .../replicated/groups/ZooKeeperGroup.scala      | 191 +++++++++++++++++++
 .../groups/internal/ChangeListenerSupport.scala |  94 ---------
 .../groups/internal/ZooKeeperGroup.scala        | 175 -----------------
 .../leveldb/test/ElectingLevelDBStoreTest.java  |  70 +++++++
 9 files changed, 421 insertions(+), 425 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5e63ddd3/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
index 7c82dd5..c397760 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
@@ -18,8 +18,8 @@ package org.apache.activemq.leveldb.replicated
 
 import org.linkedin.util.clock.Timespan
 import scala.reflect.BeanProperty
-import org.apache.activemq.util.{JMXSupport, ServiceStopper, ServiceSupport}
-import org.apache.activemq.leveldb.{LevelDBStoreViewMBean, LevelDBClient, RecordLog, LevelDBStore}
+import org.apache.activemq.util.ServiceStopper
+import org.apache.activemq.leveldb.{LevelDBClient, RecordLog, LevelDBStore}
 import java.net.{NetworkInterface, InetAddress}
 import org.fusesource.hawtdispatch._
 import org.apache.activemq.broker.{LockableServiceSupport, Locker}
@@ -30,11 +30,9 @@ import org.apache.activemq.leveldb.util.Log
 import java.io.File
 import org.apache.activemq.usage.SystemUsage
 import org.apache.activemq.ActiveMQMessageAuditNoSync
-import org.fusesource.hawtdispatch
 import org.apache.activemq.broker.jmx.{OpenTypeSupport, BrokerMBeanSupport, AnnotatedMBean}
-import org.apache.activemq.leveldb.LevelDBStore._
 import javax.management.ObjectName
-import javax.management.openmbean.{CompositeDataSupport, SimpleType, CompositeType, CompositeData}
+import javax.management.openmbean.{CompositeDataSupport, SimpleType, CompositeData}
 import java.util
 import org.apache.activemq.leveldb.replicated.groups._
 
@@ -138,7 +136,7 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
   var slave: SlaveLevelDBStore = _
 
   var zk_client: ZKClient = _
-  var zk_group: Group = _
+  var zk_group: ZooKeeperGroup = _
 
   var position: Long = -1L
 
@@ -270,6 +268,22 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
     zk_group.close
     zk_client.close()
     zk_client = null
+
+    if( master!=null ) {
+      val latch = new CountDownLatch(1)
+      stop_master {
+        latch.countDown()
+      }
+      latch.await()
+    }
+    if( slave !=null ) {
+      val latch = new CountDownLatch(1)
+      stop_slave {
+        latch.countDown()
+      }
+      latch.await()
+
+    }
     if( master_started.get() ) {
       stopped_latch.countDown()
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/5e63ddd3/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
index c1c4c0c..8462f39 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
@@ -3,6 +3,7 @@ package org.apache.activemq.leveldb.replicated
 import org.apache.activemq.leveldb.replicated.groups._
 import org.codehaus.jackson.annotate.JsonProperty
 import org.apache.activemq.leveldb.util.{Log, JsonCodec}
+import java.io.IOException
 
 
 class LevelDBNodeState extends NodeState {
@@ -67,7 +68,7 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve
     var next = create_state
     if (next != last_state) {
       last_state = next
-      update(next)
+      join(next)
     }
   }
 
@@ -89,6 +90,7 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve
       changed
     }
 
+    var stopped = false;
     def changed:Unit = elector.synchronized {
       debug("ZooKeeper group changed: %s", members)
 
@@ -139,7 +141,7 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve
         elected = null
       }
 
-      val master_elected = master.map(_.elected).getOrElse(null) 
+      val master_elected = if(eid==null) null else master.map(_.elected).getOrElse(null)
 
       // If no master is currently elected, we need to report our current store position.
       // Since that will be used to select the master.
@@ -155,7 +157,7 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve
       }
 
       // Do we need to stop the running master?
-      if (master_elected != eid && address != null && !updating_store) {
+      if ((eid==null || master_elected != eid) && address!=null && !updating_store)
{
         info("Demoted to slave")
         updating_store = true
         store.stop_master {
@@ -169,7 +171,7 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve
       }
 
       // Have we been promoted to being the master?
-      if (master_elected == eid && address==null && !updating_store ) {
+      if (eid!=null && master_elected == eid && address==null &&
!updating_store ) {
         info("Promoted to master")
         updating_store = true
         store.start_master { port =>
@@ -183,7 +185,7 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve
       }
 
       // Can we become a slave?
-      if (master_elected != eid && address == null) {
+      if ( (eid==null || master_elected != eid) && address == null) {
         // Did the master address change?
         if (connect_target != connected_address) {
 
@@ -214,8 +216,9 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve
           }
         }
       }
-
-      update
+      if( group.zk.isConnected ) {
+        update
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/5e63ddd3/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ChangeListener.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ChangeListener.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ChangeListener.scala
new file mode 100644
index 0000000..d76ade8
--- /dev/null
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ChangeListener.scala
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.replicated.groups
+
+import org.slf4j.{Logger, LoggerFactory}
+import java.util.concurrent.TimeUnit
+
+
+/**
+ * <p>
+ *   Callback interface used to get notifications of changes
+ *   to a cluster group.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait ChangeListener {
+  def changed:Unit
+  def connected:Unit
+  def disconnected:Unit
+}
+
+object ChangeListenerSupport {
+    val LOG: Logger = LoggerFactory.getLogger(classOf[ChangeListenerSupport])
+}
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait ChangeListenerSupport {
+
+  var listeners = List[ChangeListener]()
+
+  def connected:Boolean
+
+  def add(listener: ChangeListener): Unit = {
+    val connected = this.synchronized {
+      listeners ::= listener
+      this.connected
+    }
+    if (connected) {
+      listener.connected
+    }
+  }
+
+  def remove(listener: ChangeListener): Unit = this.synchronized {
+    listeners = listeners.filterNot(_ == listener)
+  }
+
+  def fireConnected() = {
+    val listeners = this.synchronized { this.listeners }
+    check_elapsed_time {
+      for (listener <- listeners) {
+        listener.connected
+      }
+    }
+  }
+
+  def fireDisconnected() = {
+    val listeners = this.synchronized { this.listeners }
+    check_elapsed_time {
+      for (listener <- listeners) {
+        listener.disconnected
+      }
+    }
+  }
+
+  def fireChanged() = {
+    val listeners = this.synchronized { this.listeners }
+    val start = System.nanoTime()
+    check_elapsed_time {
+      for (listener <- listeners) {
+        listener.changed
+      }
+    }
+  }
+
+  def check_elapsed_time[T](func: => T):T = {
+    val start = System.nanoTime()
+    try {
+      func
+    } finally {
+      val end = System.nanoTime()
+      val elapsed = TimeUnit.NANOSECONDS.toMillis(end-start)
+      if( elapsed > 100 ) {
+        ChangeListenerSupport.LOG.warn("listeners are taking too long to process the events")
+      }
+    }
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/5e63ddd3/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala
index 40eadf7..18ef167 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala
@@ -18,7 +18,6 @@ package org.apache.activemq.leveldb.replicated.groups
 
 
 import collection.mutable.{ListBuffer, HashMap}
-import internal.ChangeListenerSupport
 
 import java.io._
 import org.codehaus.jackson.map.ObjectMapper
@@ -83,7 +82,7 @@ object ClusteredSupport {
 class ClusteredSingletonWatcher[T <: NodeState](val stateClass:Class[T]) extends ChangeListenerSupport
{
   import ClusteredSupport._
   
-  protected var _group:Group = _
+  protected var _group:ZooKeeperGroup = _
   def group = _group
 
   /**
@@ -124,7 +123,7 @@ class ClusteredSingletonWatcher[T <: NodeState](val stateClass:Class[T])
extends
   protected def onConnected = {}
   protected def onDisconnected = {}
 
-  def start(group:Group) = this.synchronized {
+  def start(group:ZooKeeperGroup) = this.synchronized {
     if(_group !=null )
       throw new IllegalStateException("Already started.")
     _group = group
@@ -186,7 +185,7 @@ class ClusteredSingleton[T <: NodeState ](stateClass:Class[T]) extends
Clustered
 
   override def stop = {
     this.synchronized {
-      if(_eid != null) {
+      if(_state != null) {
         leave
       }
       super.stop
@@ -200,52 +199,42 @@ class ClusteredSingleton[T <: NodeState ](stateClass:Class[T]) extends
Clustered
       throw new IllegalArgumentException("The state id cannot be null")
     if(_group==null)
       throw new IllegalStateException("Not started.")
-    if(this._state!=null)
-      throw new IllegalStateException("Already joined")
     this._state = state
-    _eid = group.join(encode(state, mapper))
-  }
 
-  def leave:Unit = this.synchronized {
-    if(this._state==null)
-      throw new IllegalStateException("Not joined")
-    if(_group==null)
-      throw new IllegalStateException("Not started.")
-    _group.leave(_eid)
-    _eid = null
-    this._state = null.asInstanceOf[T]
+    while( connected ) {
+      if( _eid == null ) {
+        _eid = group.join(encode(state, mapper))
+        return;
+      } else {
+        try {
+          _group.update(_eid, encode(state, mapper))
+          return;
+        } catch {
+          case e:NoNodeException =>
+            this._eid = null;
+        }
+      }
+    }
   }
 
-  def update(state:T) = this.synchronized {
+  def leave:Unit = this.synchronized {
     if(this._state==null)
       throw new IllegalStateException("Not joined")
-    if(state==null)
-      throw new IllegalArgumentException("State cannot be null")
-    if(state.id==null)
-      throw new IllegalArgumentException("The state id cannot be null")
-    if(state.id!=this._state.id)
-      throw new IllegalArgumentException("The state id cannot change")
-
     if(_group==null)
       throw new IllegalStateException("Not started.")
 
-    this._state = state
-    try {
-      _group.update(_eid, encode(state, mapper))
-    } catch {
-      case e:NoNodeException =>
-        this._state = null.asInstanceOf[T]
-        join(state)
+    this._state = null.asInstanceOf[T]
+    if( _eid!=null && connected ) {
+      _group.leave(_eid)
+      _eid = null
     }
   }
 
   override protected def onDisconnected {
-    this._eid = null
   }
 
   override protected def onConnected {
-    if( this.eid==null && this._state!=null ) {
-      this._state = null.asInstanceOf[T]
+    if( this._state!=null ) {
       join(this._state)
     }
   }

http://git-wip-us.apache.org/repos/asf/activemq/blob/5e63ddd3/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/Group.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/Group.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/Group.scala
deleted file mode 100644
index 1ace011..0000000
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/Group.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.leveldb.replicated.groups
-
-import internal.ZooKeeperGroup
-import org.apache.zookeeper.data.ACL
-import org.apache.zookeeper.ZooDefs.Ids
-import java.util.LinkedHashMap
-
-/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object ZooKeeperGroupFactory {
-
-  def create(zk: ZKClient, path: String):Group = new ZooKeeperGroup(zk, path)
-  def members(zk: ZKClient, path: String):LinkedHashMap[String, Array[Byte]] = ZooKeeperGroup.members(zk,
path)
-}
-
-/**
- * <p>
- *   Used the join a cluster group and to monitor the memberships
- *   of that group.
- * </p>
- * <p>
- *   This object is not thread safe.  You should are responsible for
- *   synchronizing access to it across threads.
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-trait Group {
-
-  /**
-   * Adds a member to the group with some associated data.
-   */
-  def join(data:Array[Byte]):String
-
-  /**
-   * Updates the data associated with joined member.
-   */
-  def update(id:String, data:Array[Byte]):Unit
-
-  /**
-   * Removes a previously added member.
-   */
-  def leave(id:String):Unit
-
-  /**
-   * Lists all the members currently in the group.
-   */
-  def members:java.util.LinkedHashMap[String, Array[Byte]]
-
-  /**
-   * Registers a change listener which will be called
-   * when the cluster membership changes.
-   */
-  def add(listener:ChangeListener)
-
-  /**
-   * Removes a previously added change listener.
-   */
-  def remove(listener:ChangeListener)
-
-  /**
-   * A group should be closed to release aquired resources used
-   * to monitor the group membership.
-   *
-   * Whe the Group is closed, any memberships registered via this
-   * Group will be removed from the group.
-   */
-  def close:Unit
-
-  /**
-   * Are we connected with the cluster?
-   */
-  def connected:Boolean
-}
-
-/**
- * <p>
- *   Callback interface used to get notifications of changes
- *   to a cluster group.
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-trait ChangeListener {
-  def changed:Unit
-  def connected:Unit
-  def disconnected:Unit
-}
-

http://git-wip-us.apache.org/repos/asf/activemq/blob/5e63ddd3/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ZooKeeperGroup.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ZooKeeperGroup.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ZooKeeperGroup.scala
new file mode 100644
index 0000000..39399d1
--- /dev/null
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ZooKeeperGroup.scala
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.replicated.groups
+
+import org.apache.zookeeper._
+import org.linkedin.zookeeper.tracker._
+import scala.collection.mutable.HashMap
+import org.linkedin.zookeeper.client.LifecycleListener
+import collection.JavaConversions._
+import java.util.{LinkedHashMap, Collection}
+import org.apache.zookeeper.KeeperException.{ConnectionLossException, NoNodeException}
+import scala.Predef._
+import scala.Some
+
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object ZooKeeperGroupFactory {
+
+  def create(zk: ZKClient, path: String):ZooKeeperGroup = new ZooKeeperGroup(zk, path)
+  def members(zk: ZKClient, path: String):LinkedHashMap[String, Array[Byte]] = ZooKeeperGroup.members(zk,
path)
+}
+
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object ZooKeeperGroup {
+  def members(zk: ZKClient, path: String):LinkedHashMap[String, Array[Byte]] = {
+    var rc = new LinkedHashMap[String, Array[Byte]]
+    zk.getAllChildren(path).sortWith((a,b)=> a < b).foreach { node =>
+      try {
+        if( node.matches("""0\d+""") ) {
+          rc.put(node, zk.getData(path+"/"+node))
+        } else {
+          None
+        }
+      } catch {
+        case e:Throwable =>
+          e.printStackTrace
+      }
+    }
+    rc
+
+  }
+
+
+}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class ZooKeeperGroup(val zk: ZKClient, val root: String) extends LifecycleListener with ChangeListenerSupport
{
+
+  val tree = new ZooKeeperTreeTracker[Array[Byte]](zk, new ZKByteArrayDataReader, root, 1)
+  val joins = HashMap[String, Int]()
+
+  var members = new LinkedHashMap[String, Array[Byte]]
+
+  private def member_path_prefix = root + "/0"
+
+  zk.registerListener(this)
+
+  create(root)
+  tree.track(new NodeEventsListener[Array[Byte]]() {
+    def onEvents(events: Collection[NodeEvent[Array[Byte]]]): Unit = {
+      if( !closed )
+        fire_cluster_change
+    }
+  })
+  fire_cluster_change
+
+  @volatile
+  var closed = false
+
+  def close = this.synchronized {
+    closed = true
+    joins.foreach { case (path, version) =>
+      try {
+        if( zk.isConnected ) {
+          zk.delete(member_path_prefix + path, version)
+        }
+      } catch {
+        case x:NoNodeException => // Already deleted.
+      }
+    }
+    joins.clear
+    tree.destroy
+    zk.removeListener(this)
+  }
+
+  def connected = zk.isConnected
+  def onConnected() = fireConnected()
+  def onDisconnected() = {
+    this.members = new LinkedHashMap()
+    fireDisconnected()
+  }
+
+  def join(data:Array[Byte]=null): String = this.synchronized {
+    val id = zk.createWithParents(member_path_prefix, data, CreateMode.EPHEMERAL_SEQUENTIAL).stripPrefix(member_path_prefix)
+    joins.put(id, 0)
+    id
+  }
+
+  def update(path:String, data:Array[Byte]=null): Unit = this.synchronized {
+    joins.get(path) match {
+      case Some(ver) =>
+        try {
+          val stat = zk.setData(member_path_prefix + path, data, ver)
+          joins.put(path, stat.getVersion)
+        }
+        catch {
+          case e:NoNodeException =>
+            joins.remove(path)
+            throw e;
+        }
+      case None => throw new NoNodeException("Has not joined locally: "+path)
+    }
+  }
+
+  def leave(path:String): Unit = this.synchronized {
+    joins.remove(path).foreach {
+      case version =>
+          try {
+            zk.delete(member_path_prefix + path, version)
+          } catch {
+            case x: NoNodeException => // Already deleted.
+            case x: ConnectionLossException => // disconnected
+          }
+    }
+  }
+
+  private def fire_cluster_change: Unit = {
+    this.synchronized {
+      val t = tree.getTree.toList.filterNot { x =>
+      // don't include the root node, or nodes that don't match our naming convention.
+        (x._1 == root) || !x._1.stripPrefix(root).matches("""/0\d+""")
+      }
+
+      this.members = new LinkedHashMap()
+      t.sortWith((a,b)=> a._1 < b._1 ).foreach { x=>
+        this.members.put(x._1.stripPrefix(member_path_prefix), x._2.getData)
+      }
+    }
+    fireChanged()
+  }
+
+  private def create(path: String, count : java.lang.Integer = 0): Unit = {
+    try {
+      if (zk.exists(path, false) != null) {
+        return
+      }
+      try {
+        // try create given path in persistent mode
+        zk.createOrSetWithParents(path, "", CreateMode.PERSISTENT)
+      } catch {
+        case ignore: KeeperException.NodeExistsException =>
+      }
+    } catch {
+      case ignore : KeeperException.SessionExpiredException => {
+        if (count > 20) {
+          // we tried enought number of times
+          throw new IllegalStateException("Cannot create path " + path, ignore)
+        }
+        // try to create path with increased counter value
+        create(path, count + 1)
+      }
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/5e63ddd3/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala
deleted file mode 100644
index 763059a..0000000
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.leveldb.replicated.groups.internal
-
-import org.apache.activemq.leveldb.replicated.groups.ChangeListener
-import org.slf4j.{Logger, LoggerFactory}
-import java.util.concurrent.TimeUnit
-
-
-object ChangeListenerSupport {
-    val LOG: Logger = LoggerFactory.getLogger(classOf[ChangeListenerSupport])
-}
-/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-trait ChangeListenerSupport {
-
-  var listeners = List[ChangeListener]()
-
-  def connected:Boolean
-
-  def add(listener: ChangeListener): Unit = {
-    val connected = this.synchronized {
-      listeners ::= listener
-      this.connected
-    }
-    if (connected) {
-      listener.connected
-    }
-  }
-
-  def remove(listener: ChangeListener): Unit = this.synchronized {
-    listeners = listeners.filterNot(_ == listener)
-  }
-
-  def fireConnected() = {
-    val listeners = this.synchronized { this.listeners }
-    check_elapsed_time {
-      for (listener <- listeners) {
-        listener.connected
-      }
-    }
-  }
-
-  def fireDisconnected() = {
-    val listeners = this.synchronized { this.listeners }
-    check_elapsed_time {
-      for (listener <- listeners) {
-        listener.disconnected
-      }
-    }
-  }
-
-  def fireChanged() = {
-    val listeners = this.synchronized { this.listeners }
-    val start = System.nanoTime()
-    check_elapsed_time {
-      for (listener <- listeners) {
-        listener.changed
-      }
-    }
-  }
-
-  def check_elapsed_time[T](func: => T):T = {
-    val start = System.nanoTime()
-    try {
-      func
-    } finally {
-      val end = System.nanoTime()
-      val elapsed = TimeUnit.NANOSECONDS.toMillis(end-start)
-      if( elapsed > 100 ) {
-        ChangeListenerSupport.LOG.warn("listeners are taking too long to process the events")
-      }
-    }
-  }
-  
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/5e63ddd3/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala
deleted file mode 100644
index a44cc22..0000000
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.leveldb.replicated.groups.internal
-
-import org.apache.zookeeper._
-import java.lang.String
-import org.linkedin.zookeeper.tracker._
-import org.apache.activemq.leveldb.replicated.groups.{ZKClient, ChangeListener, Group}
-import scala.collection.mutable.HashMap
-import org.linkedin.zookeeper.client.LifecycleListener
-import collection.JavaConversions._
-import java.util.{LinkedHashMap, Collection}
-import org.apache.zookeeper.KeeperException.{ConnectionLossException, NoNodeException, Code}
-
-/**
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object ZooKeeperGroup {
-  def members(zk: ZKClient, path: String):LinkedHashMap[String, Array[Byte]] = {
-    var rc = new LinkedHashMap[String, Array[Byte]]
-    zk.getAllChildren(path).sortWith((a,b)=> a < b).foreach { node =>
-      try {
-        if( node.matches("""0\d+""") ) {
-          rc.put(node, zk.getData(path+"/"+node))
-        } else {
-          None
-        }
-      } catch {
-        case e:Throwable =>
-          e.printStackTrace
-      }
-    }
-    rc
-
-  }
-
-
-}
-
-/**
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class ZooKeeperGroup(val zk: ZKClient, val root: String) extends Group with LifecycleListener
with ChangeListenerSupport {
-
-  val tree = new ZooKeeperTreeTracker[Array[Byte]](zk, new ZKByteArrayDataReader, root, 1)
-  val joins = HashMap[String, Int]()
-
-  var members = new LinkedHashMap[String, Array[Byte]]
-
-  private def member_path_prefix = root + "/0"
-
-  zk.registerListener(this)
-
-  create(root)
-  tree.track(new NodeEventsListener[Array[Byte]]() {
-    def onEvents(events: Collection[NodeEvent[Array[Byte]]]): Unit = {
-      if( !closed )
-        fire_cluster_change
-    }
-  })
-  fire_cluster_change
-
-  @volatile
-  var closed = false
-
-  def close = this.synchronized {
-    closed = true
-    joins.foreach { case (path, version) =>
-      try {
-        zk.delete(member_path_prefix + path, version)
-      } catch {
-        case x:NoNodeException => // Already deleted.
-      }
-    }
-    joins.clear
-    tree.destroy
-    zk.removeListener(this)
-  }
-
-  def connected = zk.isConnected
-  def onConnected() = fireConnected()
-  def onDisconnected() = {
-    this.members = new LinkedHashMap()
-    fireDisconnected()
-  }
-
-  def join(data:Array[Byte]=null): String = this.synchronized {
-    val id = zk.createWithParents(member_path_prefix, data, CreateMode.EPHEMERAL_SEQUENTIAL).stripPrefix(member_path_prefix)
-    joins.put(id, 0)
-    id
-  }
-
-  def update(path:String, data:Array[Byte]=null): Unit = this.synchronized {
-    joins.get(path) match {
-      case Some(ver) =>
-        try {
-          val stat = zk.setData(member_path_prefix + path, data, ver)
-          joins.put(path, stat.getVersion)
-        }
-        catch {
-          case e:NoNodeException =>
-            joins.remove(path)
-            throw e;
-        }
-      case None => throw new NoNodeException("Has not joined locally: "+path)
-    }
-  }
-
-  def leave(path:String): Unit = this.synchronized {
-    joins.remove(path).foreach {
-      case version =>
-          try {
-            zk.delete(member_path_prefix + path, version)
-          } catch {
-            case x: NoNodeException => // Already deleted.
-            case x: ConnectionLossException => // disconnected
-          }
-    }
-  }
-
-  private def fire_cluster_change: Unit = {
-    this.synchronized {
-      val t = tree.getTree.toList.filterNot { x =>
-      // don't include the root node, or nodes that don't match our naming convention.
-        (x._1 == root) || !x._1.stripPrefix(root).matches("""/0\d+""")
-      }
-
-      this.members = new LinkedHashMap()
-      t.sortWith((a,b)=> a._1 < b._1 ).foreach { x=>
-        this.members.put(x._1.stripPrefix(member_path_prefix), x._2.getData)
-      }
-    }
-    fireChanged()
-  }
-
-  private def create(path: String, count : java.lang.Integer = 0): Unit = {
-    try {
-      if (zk.exists(path, false) != null) {
-        return
-      }
-      try {
-        // try create given path in persistent mode
-        zk.createOrSetWithParents(path, "", CreateMode.PERSISTENT)
-      } catch {
-        case ignore: KeeperException.NodeExistsException =>
-      }
-    } catch {
-      case ignore : KeeperException.SessionExpiredException => {
-        if (count > 20) {
-          // we tried enought number of times
-          throw new IllegalStateException("Cannot create path " + path, ignore)
-        }
-        // try to create path with increased counter value
-        create(path, count + 1)
-      }
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/5e63ddd3/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
index 34fbe08..93bf051 100644
--- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
+++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
@@ -167,6 +167,76 @@ public class ElectingLevelDBStoreTest {
         }
     }
 
+    @Test(timeout = 1000 * 60 * 60)
+    public void testZooKeeperServerFailure() throws Exception {
+
+        final ArrayList<ElectingLevelDBStore> stores = new ArrayList<ElectingLevelDBStore>();
+        ArrayList<CountDownFuture> pending_starts = new ArrayList<CountDownFuture>();
+
+        for (String dir : new String[]{"leveldb-node1", "leveldb-node2", "leveldb-node3"})
{
+            ElectingLevelDBStore store = createStoreNode();
+            store.setDirectory(new File(data_dir(), dir));
+            stores.add(store);
+            pending_starts.add(asyncStart(store));
+        }
+
+        // At least one of the stores should have started.
+        CountDownFuture f = waitFor(30 * 1000, pending_starts.toArray(new CountDownFuture[pending_starts.size()]));
+        assertTrue(f != null);
+        pending_starts.remove(f);
+
+        // The other stores should not start..
+        LOG.info("Making sure the other stores don't start");
+        Thread.sleep(5000);
+        for (CountDownFuture start : pending_starts) {
+            assertFalse(start.completed());
+        }
+
+        // Stop ZooKeeper..
+        LOG.info("SHUTTING DOWN ZooKeeper!");
+        connector.shutdown();
+
+        // None of the store should be slaves...
+        within( 30, TimeUnit.SECONDS, new Task(){
+            public void run() throws Exception {
+                for (ElectingLevelDBStore store : stores) {
+                    assertFalse(store.isMaster());
+                }
+            }
+        });
+
+        for (ElectingLevelDBStore store : stores) {
+            store.stop();
+        }
+    }
+
+    static interface Task {
+        public void run() throws Exception;
+    }
+
+    private void within(int time, TimeUnit unit, Task task) throws InterruptedException {
+        long timeMS = unit.toMillis(time);
+        long deadline = System.currentTimeMillis() + timeMS;
+        while (true) {
+            try {
+                task.run();
+                return;
+            } catch (Throwable e) {
+                long remaining = deadline - System.currentTimeMillis();
+                if( remaining <=0 ) {
+                    if( e instanceof RuntimeException ) {
+                        throw (RuntimeException)e;
+                    }
+                    if( e instanceof Error ) {
+                        throw (Error)e;
+                    }
+                    throw new RuntimeException(e);
+                }
+                Thread.sleep(Math.min(timeMS/10, remaining));
+            }
+        }
+    }
+
     private CountDownFuture waitFor(int timeout, CountDownFuture... futures) throws InterruptedException
{
         long deadline =  System.currentTimeMillis()+timeout;
         while( true ) {


Mime
View raw message