activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cepo...@apache.org
Subject activemq git commit: Fix for https://issues.apache.org/jira/browse/AMQ-5082 ActiveMQ replicatedLevelDB cluster breaks, all nodes stop listening.
Date Tue, 31 Mar 2015 00:07:50 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 351d4b9de -> a39e51e05


Fix for https://issues.apache.org/jira/browse/AMQ-5082 ActiveMQ replicatedLevelDB cluster
breaks, all nodes stop listening.

Many thanks to Jim Robinson (jim.robinson@gmail.com) for the patch!


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a39e51e0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a39e51e0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a39e51e0

Branch: refs/heads/master
Commit: a39e51e0519852332f7779443c3db09b6e691d49
Parents: 351d4b9
Author: Christian Posta <christian.posta@gmail.com>
Authored: Mon Mar 30 17:07:12 2015 -0700
Committer: Christian Posta <christian.posta@gmail.com>
Committed: Mon Mar 30 17:07:42 2015 -0700

----------------------------------------------------------------------
 .../replicated/groups/ZooKeeperGroup.scala      |  30 ++++--
 .../leveldb/test/ElectingLevelDBStoreTest.java  |  61 +++++++++++
 .../leveldb/test/ZooKeeperTestSupport.java      |   6 +-
 .../zookeeper/server/TestServerCnxnFactory.java | 101 +++++++++++++++++++
 4 files changed, 188 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a39e51e0/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ZooKeeperGroup.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ZooKeeperGroup.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ZooKeeperGroup.scala
index 39399d1..99808ed 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ZooKeeperGroup.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ZooKeeperGroup.scala
@@ -72,7 +72,9 @@ object ZooKeeperGroup {
  */
 class ZooKeeperGroup(val zk: ZKClient, val root: String) extends LifecycleListener with ChangeListenerSupport
{
 
-  val tree = new ZooKeeperTreeTracker[Array[Byte]](zk, new ZKByteArrayDataReader, root, 1)
+  var tree = new ZooKeeperTreeTracker[Array[Byte]](zk, new ZKByteArrayDataReader, root, 1)
+  var rebuildTree = false
+
   val joins = HashMap[String, Int]()
 
   var members = new LinkedHashMap[String, Array[Byte]]
@@ -82,12 +84,13 @@ class ZooKeeperGroup(val zk: ZKClient, val root: String) extends LifecycleListen
   zk.registerListener(this)
 
   create(root)
-  tree.track(new NodeEventsListener[Array[Byte]]() {
+  var treeEventHandler = new NodeEventsListener[Array[Byte]]() {
     def onEvents(events: Collection[NodeEvent[Array[Byte]]]): Unit = {
       if( !closed )
-        fire_cluster_change
+        fire_cluster_change;
     }
-  })
+  }
+  tree.track(treeEventHandler)
   fire_cluster_change
 
   @volatile
@@ -110,7 +113,21 @@ class ZooKeeperGroup(val zk: ZKClient, val root: String) extends LifecycleListen
   }
 
   def connected = zk.isConnected
-  def onConnected() = fireConnected()
+  def onConnected() = {
+    this.synchronized {
+      // underlying ZooKeeperTreeTracker isn't rebuilding itself after
+      // the loss of the session, so we need to destroy/rebuild it on
+      // reconnect.
+      if (rebuildTree) {
+        tree.destroy
+        tree = new ZooKeeperTreeTracker[Array[Byte]](zk, new ZKByteArrayDataReader, root,
1)
+        tree.track(treeEventHandler)
+      } else {
+        rebuildTree = true
+      }
+    }
+    fireConnected()
+  }
   def onDisconnected() = {
     this.members = new LinkedHashMap()
     fireDisconnected()
@@ -187,5 +204,4 @@ class ZooKeeperGroup(val zk: ZKClient, val root: String) extends LifecycleListen
       }
     }
   }
-
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/a39e51e0/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
index 8dcaa8e..4d852de 100644
--- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
+++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
@@ -182,6 +182,67 @@ public class ElectingLevelDBStoreTest extends ZooKeeperTestSupport {
         });
     }
 
+    /*
+     * testAMQ5082 tests the behavior of an ElectingLevelDBStore
+     * pool when ZooKeeper I/O timeouts occur. See issue AMQ-5082.
+     */
+    @Test(timeout = 1000 * 60 * 5)
+    public void testAMQ5082() throws Throwable {
+        final ArrayList<ElectingLevelDBStore> stores = new ArrayList<ElectingLevelDBStore>();
+
+        LOG.info("Launching 3 stores");
+        for (String dir : new String[]{"leveldb-node1", "leveldb-node2", "leveldb-node3"})
{
+            ElectingLevelDBStore store = createStoreNode();
+            store.setDirectory(new File(data_dir(), dir));
+            stores.add(store);
+            asyncStart(store);
+        }
+
+        LOG.info("Waiting 30s for stores to start");
+        Thread.sleep(30 * 1000);
+
+        LOG.info("Checking for a single master");
+        ElectingLevelDBStore master = null;
+        for (ElectingLevelDBStore store: stores) {
+            if (store.isMaster()) {
+                assertNull(master);
+                master = store;
+            }
+        }
+        assertNotNull(master);
+
+        LOG.info("Imposing 1s I/O wait on Zookeeper connections, waiting 30s to confirm that
quorum is not lost");
+        this.connector.testHandle.setIOWaitMillis(1 * 1000, 30 * 1000);
+
+        LOG.info("Confirming that the quorum has not been lost");
+        for (ElectingLevelDBStore store: stores) {
+            if (store.isMaster()) {
+                assertTrue(master == store);
+            }
+        }
+
+        LOG.info("Imposing 11s I/O wait on Zookeeper connections, waiting 30s for quorum
to be lost");
+        this.connector.testHandle.setIOWaitMillis(11 * 1000, 30 * 1000);
+
+        LOG.info("Confirming that the quorum has been lost");
+        for (ElectingLevelDBStore store: stores) {
+            assertFalse(store.isMaster());
+        }
+        master = null;
+
+        LOG.info("Lifting I/O wait on Zookeeper connections, waiting 30s for quorum to be
re-established");
+        this.connector.testHandle.setIOWaitMillis(0, 30 * 1000);
+
+        LOG.info("Checking for a single master");
+        for (ElectingLevelDBStore store: stores) {
+            if (store.isMaster()) {
+                assertNull(master);
+                master = store;
+            }
+        }
+        assertNotNull(master);
+    }
+
     @After
     public void stop() throws Exception {
         if (master != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/a39e51e0/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ZooKeeperTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ZooKeeperTestSupport.java
b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ZooKeeperTestSupport.java
index 7498d98..34422be 100644
--- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ZooKeeperTestSupport.java
+++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ZooKeeperTestSupport.java
@@ -19,7 +19,7 @@ package org.apache.activemq.leveldb.test;
 import org.apache.activemq.leveldb.CountDownFuture;
 import org.apache.activemq.leveldb.util.FileSupport;
 import org.apache.commons.io.FileUtils;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.TestServerCnxnFactory;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.junit.After;
@@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class ZooKeeperTestSupport {
 
-    protected NIOServerCnxnFactory connector;
+    protected TestServerCnxnFactory connector;
 
     static File data_dir() {
         return new File("target/activemq-data/leveldb-elections");
@@ -49,7 +49,7 @@ public class ZooKeeperTestSupport {
         ZooKeeperServer zk_server = new ZooKeeperServer();
         zk_server.setTickTime(500);
         zk_server.setTxnLogFactory(new FileTxnSnapLog(new File(data_dir(), "zk-log"), new
File(data_dir(), "zk-data")));
-        connector = new NIOServerCnxnFactory();
+        connector = new TestServerCnxnFactory();
         connector.configure(new InetSocketAddress(0), 100);
         connector.startup(zk_server);
         System.out.println("ZooKeeper started");

http://git-wip-us.apache.org/repos/asf/activemq/blob/a39e51e0/activemq-leveldb-store/src/test/java/org/apache/zookeeper/server/TestServerCnxnFactory.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/test/java/org/apache/zookeeper/server/TestServerCnxnFactory.java
b/activemq-leveldb-store/src/test/java/org/apache/zookeeper/server/TestServerCnxnFactory.java
new file mode 100644
index 0000000..cc0adca
--- /dev/null
+++ b/activemq-leveldb-store/src/test/java/org/apache/zookeeper/server/TestServerCnxnFactory.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * TestServerCnxnFactory allows a caller to impose an artifical
+ * wait on I/O over the ServerCnxn used to communicate with the
+ * ZooKeeper server.
+ */
+public class TestServerCnxnFactory extends NIOServerCnxnFactory {
+    protected static final Logger LOG = LoggerFactory.getLogger(TestServerCnxnFactory.class);
+
+    /* testHandle controls whehter or not an artifical wait
+     * is imposed when talking to the ZooKeeper server
+    */
+    public TestHandle testHandle = new TestHandle();
+
+    public TestServerCnxnFactory() throws IOException {
+        super();
+    }
+
+    protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk) throws
IOException {
+        return new TestServerCnxn(this.zkServer, sock, sk, this, testHandle);
+    }
+
+    /*
+     * TestHandle is handed to TestServerCnxn and is used to
+     * control the amount of time the TestServerCnxn waits
+     * before allowing an I/O operation.
+     */
+    public class TestHandle {
+        private Object mu = new Object();
+        private int ioWaitMillis = 0;
+
+        /*
+         * Set an artifical I/O wait (in milliseconds) on ServerCnxn and
+         * then sleep for the specified number of milliseconds.
+         */
+        public void setIOWaitMillis(int ioWaitMillis, int sleepMillis) {
+            synchronized(mu) {
+                this.ioWaitMillis = ioWaitMillis;
+            }
+            if (sleepMillis > 0) {
+                try {
+                    Thread.sleep(sleepMillis);
+                } catch (InterruptedException e) {}
+            }
+        }
+
+        /*
+         * Get the number of milliseconds to wait before
+         * allowing ServerCnxn to perform I/O.
+         */
+        public int getIOWaitMillis() {
+            synchronized(mu) {
+                return this.ioWaitMillis;
+            }
+        }
+    }
+
+    public class TestServerCnxn extends NIOServerCnxn {
+        public TestHandle testHandle;
+
+        public TestServerCnxn(ZooKeeperServer zk, SocketChannel sock, SelectionKey sk, NIOServerCnxnFactory
factory, TestHandle testHandle) throws IOException {
+            super(zk, sock, sk, factory);
+            this.testHandle = testHandle;
+        }
+
+        public void doIO(SelectionKey k) throws InterruptedException {
+            final int millis = this.testHandle.getIOWaitMillis();
+            if (millis > 0) {
+                LOG.info("imposing a "+millis+" millisecond wait on ServerCxn: "+this);
+                try {
+                    Thread.sleep(millis);
+                } catch (InterruptedException e) {}
+            }
+            super.doIO(k);
+        }
+    }
+}


Mime
View raw message