activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject git commit: leveldb replication Master was failing to give up being master after it's process is suspended by using ctrl-z.
Date Tue, 03 Sep 2013 14:41:46 GMT
Updated Branches:
  refs/heads/trunk 8d4fef8af -> 1eca03135


leveldb replication Master was failing to give up being master after it's process is suspended
by using ctrl-z.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1eca0313
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1eca0313
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1eca0313

Branch: refs/heads/trunk
Commit: 1eca0313562c7e1d38b9d6f0c23478438b7f9943
Parents: 8d4fef8
Author: Hiram Chirino <hiram@hiramchirino.com>
Authored: Tue Sep 3 10:38:30 2013 -0400
Committer: Hiram Chirino <hiram@hiramchirino.com>
Committed: Tue Sep 3 10:41:33 2013 -0400

----------------------------------------------------------------------
 .../apache/activemq/leveldb/LevelDBClient.scala | 13 +++++++---
 .../leveldb/replicated/MasterElector.scala      |  4 +++-
 .../leveldb/replicated/MasterLevelDBStore.scala | 14 +++++++++--
 .../replicated/groups/ClusteredSingleton.scala  | 25 +++++++++++++++++++-
 .../groups/internal/ChangeListenerSupport.scala |  6 ++---
 .../groups/internal/ZooKeeperGroup.scala        | 18 ++++++++++----
 6 files changed, 66 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/1eca0313/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
index fb574df..4bbfda5 100755
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
@@ -511,12 +511,20 @@ class LevelDBClient(store: LevelDBStore) {
 
   def might_fail[T](func : =>T):T = {
     def handleFailure(e:IOException) = {
+      var failure:Throwable = e;
       if( store.broker_service !=null ) {
         // This should start stopping the broker but it might block,
         // so do it on another thread...
         new Thread("LevelDB IOException handler.") {
           override def run() {
-            store.broker_service.handleIOException(e);
+            try {
+              store.broker_service.handleIOException(e)
+            } catch {
+              case e:RuntimeException =>
+                failure = e
+            } finally {
+              store.stop()
+            }
           }
         }.start()
         // Lets wait until the broker service has started stopping.  Once the
@@ -526,8 +534,7 @@ class LevelDBClient(store: LevelDBStore) {
           Thread.sleep(100);
         }
       }
-      store.stop()
-      throw e;
+      throw failure;
     }
     try {
       func

http://git-wip-us.apache.org/repos/asf/activemq/blob/1eca0313/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
index 88fcc9d..b5d8e10 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
@@ -85,7 +85,9 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve
   object change_listener extends ChangeListener {
 
     def connected = changed
-    def disconnected = changed
+    def disconnected = {
+      changed
+    }
 
     def changed:Unit = elector.synchronized {
 //      info(eid+" cluster state changed: "+members)

http://git-wip-us.apache.org/repos/asf/activemq/blob/1eca0313/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
index ee0d4da..0318eff 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
@@ -357,7 +357,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait
{
       return
     }
 
-    if( isStopped ) {
+    if( isStoppedOrStopping ) {
       throw new IllegalStateException("Store replication stopped")
     }
 
@@ -368,13 +368,23 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait
{
     }
 
     while( !position_sync.await(1, TimeUnit.SECONDS) ) {
-      if( isStopped ) {
+      if( isStoppedOrStopping ) {
         throw new IllegalStateException("Store replication stopped")
       }
       warn("Store update waiting on %d replica(s) to catch up to log position %d. %s", minSlaveAcks,
position, status)
     }
   }
 
+
+  def isStoppedOrStopping: Boolean = {
+    if( isStopped || isStopping )
+      return true
+    if( broker_service!=null && broker_service.isStopping )
+      return true
+    false
+  }
+
+
   def replicate_wal(file:File, position:Long, offset:Long, length:Long):Unit = {
     if( length > 0 ) {
       val value = new LogWrite

http://git-wip-us.apache.org/repos/asf/activemq/blob/1eca0313/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala
index 20b3d33..40eadf7 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala
@@ -27,6 +27,7 @@ import java.util.LinkedHashMap
 import java.lang.{IllegalStateException, String}
 import reflect.BeanProperty
 import org.codehaus.jackson.annotate.JsonProperty
+import org.apache.zookeeper.KeeperException.NoNodeException
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -108,16 +109,20 @@ class ClusteredSingletonWatcher[T <: NodeState](val stateClass:Class[T])
extends
     }
 
     def connected = {
+      onConnected
       changed
       ClusteredSingletonWatcher.this.fireConnected
     }
 
     def disconnected = {
+      onDisconnected
       changed
       ClusteredSingletonWatcher.this.fireDisconnected
     }
   }
 
+  protected def onConnected = {}
+  protected def onDisconnected = {}
 
   def start(group:Group) = this.synchronized {
     if(_group !=null )
@@ -223,8 +228,26 @@ class ClusteredSingleton[T <: NodeState ](stateClass:Class[T]) extends
Clustered
 
     if(_group==null)
       throw new IllegalStateException("Not started.")
+
     this._state = state
-    _group.update(_eid, encode(state, mapper))
+    try {
+      _group.update(_eid, encode(state, mapper))
+    } catch {
+      case e:NoNodeException =>
+        this._state = null.asInstanceOf[T]
+        join(state)
+    }
+  }
+
+  override protected def onDisconnected {
+    this._eid = null
+  }
+
+  override protected def onConnected {
+    if( this.eid==null && this._state!=null ) {
+      this._state = null.asInstanceOf[T]
+      join(this._state)
+    }
   }
 
   def isMaster:Boolean = this.synchronized {

http://git-wip-us.apache.org/repos/asf/activemq/blob/1eca0313/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala
index 20c50ba..763059a 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala
@@ -51,7 +51,7 @@ trait ChangeListenerSupport {
   }
 
   def fireConnected() = {
-    val listener = this.synchronized { this.listeners }
+    val listeners = this.synchronized { this.listeners }
     check_elapsed_time {
       for (listener <- listeners) {
         listener.connected
@@ -60,7 +60,7 @@ trait ChangeListenerSupport {
   }
 
   def fireDisconnected() = {
-    val listener = this.synchronized { this.listeners }
+    val listeners = this.synchronized { this.listeners }
     check_elapsed_time {
       for (listener <- listeners) {
         listener.disconnected
@@ -69,7 +69,7 @@ trait ChangeListenerSupport {
   }
 
   def fireChanged() = {
-    val listener = this.synchronized { this.listeners }
+    val listeners = this.synchronized { this.listeners }
     val start = System.nanoTime()
     check_elapsed_time {
       for (listener <- listeners) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/1eca0313/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala
index 9df9125..f416013 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala
@@ -91,7 +91,10 @@ class ZooKeeperGroup(val zk: ZKClient, val root: String) extends Group
with Life
 
   def connected = zk.isConnected
   def onConnected() = fireConnected()
-  def onDisconnected() = fireDisconnected()
+  def onDisconnected() = {
+    this.members = new LinkedHashMap()
+    fireDisconnected()
+  }
 
   def join(data:Array[Byte]=null): String = this.synchronized {
     val id = zk.createWithParents(member_path_prefix, data, CreateMode.EPHEMERAL_SEQUENTIAL).stripPrefix(member_path_prefix)
@@ -102,9 +105,16 @@ class ZooKeeperGroup(val zk: ZKClient, val root: String) extends Group
with Life
   def update(path:String, data:Array[Byte]=null): Unit = this.synchronized {
     joins.get(path) match {
       case Some(ver) =>
-        val stat = zk.setData(member_path_prefix+path, data, ver)
-        joins.put(path, stat.getVersion)
-      case None => throw new IllegalArgumentException("Has not joined locally: "+path)
+        try {
+          val stat = zk.setData(member_path_prefix + path, data, ver)
+          joins.put(path, stat.getVersion)
+        }
+        catch {
+          case e:NoNodeException =>
+            joins.remove(path)
+            throw e;
+        }
+      case None => throw new NoNodeException("Has not joined locally: "+path)
     }
   }
 


Mime
View raw message