activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject [12/20] git commit: Working on a test case for https://issues.apache.org/jira/browse/AMQ-4837 : LevelDB corrupted in AMQ cluster.
Date Wed, 12 Mar 2014 02:21:38 GMT
Working on a test case for https://issues.apache.org/jira/browse/AMQ-4837 : LevelDB corrupted
in AMQ cluster.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5161087f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5161087f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5161087f

Branch: refs/heads/activemq-5.9
Commit: 5161087f31f6217d423c4683041c18bc5655889e
Parents: 251b7da
Author: Hiram Chirino <hiram@hiramchirino.com>
Authored: Wed Oct 30 15:35:26 2013 -0400
Committer: Hadrian Zbarcea <hadrian@apache.org>
Committed: Tue Mar 11 21:16:58 2014 -0400

----------------------------------------------------------------------
 .../leveldb/test/ElectingLevelDBStoreTest.java  |  84 +--------
 .../test/ReplicatedLevelDBBrokerTest.java       | 187 +++++++++++++++++++
 .../leveldb/test/ZooKeeperTestSupport.java      | 111 +++++++++++
 3 files changed, 300 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5161087f/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
index f3c1581..c876f51 100644
--- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
+++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
@@ -16,25 +16,17 @@
  */
 package org.apache.activemq.leveldb.test;
 
-import junit.framework.TestCase;
 import org.apache.activemq.Service;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.leveldb.CountDownFuture;
 import org.apache.activemq.leveldb.LevelDBStore;
 import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore;
-import org.apache.activemq.leveldb.util.FileSupport;
 import org.apache.activemq.store.MessageStore;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.concurrent.TimeUnit;
@@ -43,37 +35,9 @@ import static org.junit.Assert.*;
 
 /**
  */
-public class ElectingLevelDBStoreTest {
-    protected static final Logger LOG = LoggerFactory.getLogger(ElectingLevelDBStoreTest.class);
-
-    NIOServerCnxnFactory connector;
-
-    static File data_dir() {
-        return new File("target/activemq-data/leveldb-elections");
-    }
+public class ElectingLevelDBStoreTest extends ZooKeeperTestSupport {
 
-
-    @Before
-    public void setUp() throws Exception {
-        FileSupport.toRichFile(data_dir()).recursiveDelete();
-
-        System.out.println("Starting ZooKeeper");
-        ZooKeeperServer zk_server = new ZooKeeperServer();
-        zk_server.setTickTime(500);
-        zk_server.setTxnLogFactory(new FileTxnSnapLog(new File(data_dir(), "zk-log"), new
File(data_dir(), "zk-data")));
-        connector = new NIOServerCnxnFactory();
-        connector.configure(new InetSocketAddress(0), 100);
-        connector.startup(zk_server);
-        System.out.println("ZooKeeper started");
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        if( connector!=null ) {
-          connector.shutdown();
-          connector = null;
-        }
-    }
+    protected static final Logger LOG = LoggerFactory.getLogger(ElectingLevelDBStoreTest.class);
 
     @Test(timeout = 1000*60*60)
     public void testElection() throws Exception {
@@ -210,50 +174,6 @@ public class ElectingLevelDBStoreTest {
         }
     }
 
-    static interface Task {
-        public void run() throws Exception;
-    }
-
-    private void within(int time, TimeUnit unit, Task task) throws InterruptedException {
-        long timeMS = unit.toMillis(time);
-        long deadline = System.currentTimeMillis() + timeMS;
-        while (true) {
-            try {
-                task.run();
-                return;
-            } catch (Throwable e) {
-                long remaining = deadline - System.currentTimeMillis();
-                if( remaining <=0 ) {
-                    if( e instanceof RuntimeException ) {
-                        throw (RuntimeException)e;
-                    }
-                    if( e instanceof Error ) {
-                        throw (Error)e;
-                    }
-                    throw new RuntimeException(e);
-                }
-                Thread.sleep(Math.min(timeMS/10, remaining));
-            }
-        }
-    }
-
-    private CountDownFuture waitFor(int timeout, CountDownFuture... futures) throws InterruptedException
{
-        long deadline =  System.currentTimeMillis()+timeout;
-        while( true ) {
-            for (CountDownFuture f:futures) {
-                if( f.await(1, TimeUnit.MILLISECONDS) ) {
-                    return f;
-                }
-            }
-            long remaining = deadline - System.currentTimeMillis();
-            if( remaining < 0 ) {
-                return null;
-            } else {
-                Thread.sleep(Math.min(remaining / 10, 100L));
-            }
-        }
-    }
-
     private CountDownFuture asyncStart(final Service service) {
         final CountDownFuture<Throwable> f = new CountDownFuture<Throwable>();
         LevelDBStore.BLOCKING_EXECUTOR().execute(new Runnable() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/5161087f/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java
b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java
new file mode 100644
index 0000000..d9e44b8
--- /dev/null
+++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore;
+import org.junit.After;
+import org.junit.Test;
+
+import javax.jms.*;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Holds broker unit tests of the replicated leveldb store.
+ */
+public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport {
+
+    final SynchronousQueue<BrokerService> masterQueue = new SynchronousQueue<BrokerService>();
+    ArrayList<BrokerService> brokers = new ArrayList<BrokerService>();
+
+    /**
+     * Tries to replicate the problem reported at:
+     * https://issues.apache.org/jira/browse/AMQ-4837
+     */
+    @Test(timeout = 1000*60*10)
+    public void testAMQ4837() throws Exception {
+
+        // 1.	Start 3 activemq nodes.
+        startBrokerAsync(createBrokerNode("node-1"));
+        startBrokerAsync(createBrokerNode("node-2"));
+        startBrokerAsync(createBrokerNode("node-3"));
+
+        // 2.	Push a message to the master and browse the queue
+        System.out.println("Wait for master to start up...");
+        BrokerService master = masterQueue.poll(60, TimeUnit.SECONDS);
+        assertNotNull("Master elected", master);
+        sendMessage(master, "Hello World #1");
+        assertEquals(1, browseMessages(master).size());
+
+        // 3.	Stop master node
+        System.out.println("Stopping master...");
+        master.stop();
+        master.waitUntilStopped();
+        BrokerService prevMaster = master;
+
+        // 4.	Push a message to the new master (Node2) and browse the queue using the web
UI. Message summary and queue content ok.
+        System.out.println("Wait for new master to start up...");
+        master = masterQueue.poll(60, TimeUnit.SECONDS);
+        assertNotNull("Master elected", master);
+        sendMessage(master, "Hello World #2");
+        assertEquals(2, browseMessages(master).size());
+
+        // 5.	Start Node1
+        System.out.println("Starting previous master...");
+        prevMaster = createBrokerNode(prevMaster.getBrokerName());
+        startBrokerAsync(prevMaster);
+
+        // 6.	Stop master node (Node2)
+        System.out.println("Stopping master...");
+        master.stop();
+        master.waitUntilStopped();
+
+        // 7.	Browse the queue using the web UI on new master (Node3). Message summary ok
however when clicking on the queue, no message details.
+        // An error (see below) is logged by the master, which attempts a restart.
+        System.out.println("Wait for new master to start up...");
+        master = masterQueue.poll(60, TimeUnit.SECONDS);
+        assertNotNull("Master elected", master);
+        assertEquals(2, browseMessages(master).size());
+
+    }
+
+    private void startBrokerAsync(BrokerService b) {
+        final BrokerService broker = b;
+        new Thread("Starting broker node: "+b.getBrokerName()){
+            @Override
+            public void run() {
+                try {
+                    broker.start();
+                    broker.waitUntilStarted();
+                    masterQueue.put(broker);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }.start();
+    }
+
+    private void sendMessage(BrokerService brokerService, String body) throws Exception {
+        TransportConnector connector = brokerService.getTransportConnectors().get(0);
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connector.getConnectUri());
+        Connection connection = factory.createConnection();
+        try {
+            connection.start();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(session.createQueue("FOO"));
+            producer.send(session.createTextMessage(body));
+        } finally {
+            connection.close();
+        }
+    }
+
+    private ArrayList<String> browseMessages(BrokerService brokerService) throws Exception
{
+        ArrayList<String> rc = new ArrayList<String>();
+        TransportConnector connector = brokerService.getTransportConnectors().get(0);
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connector.getConnectUri());
+        Connection connection = factory.createConnection();
+        try {
+            connection.start();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            QueueBrowser browser = session.createBrowser(session.createQueue("FOO"));
+            Enumeration enumeration = browser.getEnumeration();
+            while (enumeration.hasMoreElements()) {
+                TextMessage textMessage = (TextMessage) enumeration.nextElement();
+                rc.add(textMessage.getText());
+            }
+        } finally {
+            connection.close();
+        }
+        return rc;
+    }
+
+    @After
+    public void closeBrokers() throws Exception {
+        for (BrokerService broker : brokers) {
+            try {
+                broker.stop();
+                broker.waitUntilStopped();
+            } catch (Exception e) {
+            }
+        }
+    }
+
+    private BrokerService createBrokerNode(String id) throws Exception {
+        BrokerService bs = new BrokerService();
+        bs.getManagementContext().setCreateConnector(false);
+        brokers.add(bs);
+        bs.setBrokerName(id);
+        bs.setPersistenceAdapter(createStoreNode(id));
+        bs.addConnector("tcp://0.0.0.0:0");
+        return bs;
+    }
+
+
+    private ElectingLevelDBStore createStoreNode(String id) {
+
+        // This little hack is in here because we give each of the 3 brokers
+        // different broker names so they can show up in JMX correctly,
+        // but the store needs to be configured with the same broker name
+        // so that they can find each other in ZK properly.
+        ElectingLevelDBStore store = new ElectingLevelDBStore() {
+            @Override
+            public void start() throws Exception {
+                this.setBrokerName("localhost");
+                super.start();
+            }
+        };
+        store.setDirectory(new File(data_dir(), id));
+        store.setReplicas(3);
+        store.setZkAddress("localhost:" + connector.getLocalPort());
+        store.setHostname("localhost");
+        store.setBind("tcp://0.0.0.0:0");
+        return store;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/5161087f/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ZooKeeperTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ZooKeeperTestSupport.java
b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ZooKeeperTestSupport.java
new file mode 100644
index 0000000..db65b43
--- /dev/null
+++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ZooKeeperTestSupport.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.test;
+
+import org.apache.activemq.leveldb.CountDownFuture;
+import org.apache.activemq.leveldb.util.FileSupport;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by chirino on 10/30/13.
+ */
+public class ZooKeeperTestSupport {
+
+    protected NIOServerCnxnFactory connector;
+
+    static File data_dir() {
+        return new File("target/activemq-data/leveldb-elections");
+    }
+
+
+    @Before
+    public void startZooKeeper() throws Exception {
+        FileSupport.toRichFile(data_dir()).recursiveDelete();
+
+        System.out.println("Starting ZooKeeper");
+        ZooKeeperServer zk_server = new ZooKeeperServer();
+        zk_server.setTickTime(500);
+        zk_server.setTxnLogFactory(new FileTxnSnapLog(new File(data_dir(), "zk-log"), new
File(data_dir(), "zk-data")));
+        connector = new NIOServerCnxnFactory();
+        connector.configure(new InetSocketAddress(0), 100);
+        connector.startup(zk_server);
+        System.out.println("ZooKeeper started");
+    }
+
+    @After
+    public void stopZooKeeper() throws Exception {
+        if( connector!=null ) {
+          connector.shutdown();
+          connector = null;
+        }
+    }
+
+
+    protected static interface Task {
+        public void run() throws Exception;
+    }
+
+    protected  void within(int time, TimeUnit unit, Task task) throws InterruptedException
{
+        long timeMS = unit.toMillis(time);
+        long deadline = System.currentTimeMillis() + timeMS;
+        while (true) {
+            try {
+                task.run();
+                return;
+            } catch (Throwable e) {
+                long remaining = deadline - System.currentTimeMillis();
+                if( remaining <=0 ) {
+                    if( e instanceof RuntimeException ) {
+                        throw (RuntimeException)e;
+                    }
+                    if( e instanceof Error ) {
+                        throw (Error)e;
+                    }
+                    throw new RuntimeException(e);
+                }
+                Thread.sleep(Math.min(timeMS/10, remaining));
+            }
+        }
+    }
+
+    protected CountDownFuture waitFor(int timeout, CountDownFuture... futures) throws InterruptedException
{
+        long deadline =  System.currentTimeMillis()+timeout;
+        while( true ) {
+            for (CountDownFuture f:futures) {
+                if( f.await(1, TimeUnit.MILLISECONDS) ) {
+                    return f;
+                }
+            }
+            long remaining = deadline - System.currentTimeMillis();
+            if( remaining < 0 ) {
+                return null;
+            } else {
+                Thread.sleep(Math.min(remaining / 10, 100L));
+            }
+        }
+    }
+}


Mime
View raw message