activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject [14/20] git commit: Fix for: https://issues.apache.org/jira/browse/AMQ-4837 : LevelDB corrupted in AMQ cluster.
Date Wed, 12 Mar 2014 02:21:40 GMT
Fix for: https://issues.apache.org/jira/browse/AMQ-4837 : LevelDB corrupted in AMQ cluster.

- Log rotation was causing a pre-mature index snapshot to be taken on the slave (snapshot
while the slave was still synchronizing).
- Also fix issue with the append position displayed in JMX for the master not being correct.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/50f37beb
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/50f37beb
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/50f37beb

Branch: refs/heads/activemq-5.9
Commit: 50f37beb87facd24c07e66b4ed82a30a9d410278
Parents: 926a435
Author: Hiram Chirino <hiram@hiramchirino.com>
Authored: Thu Oct 31 12:18:48 2013 -0400
Committer: Hadrian Zbarcea <hadrian@apache.org>
Committed: Tue Mar 11 21:18:49 2014 -0400

----------------------------------------------------------------------
 activemq-leveldb-store/pom.xml                  |   6 +
 .../apache/activemq/leveldb/LevelDBClient.scala |  19 ++-
 .../leveldb/replicated/SlaveLevelDBStore.scala  |  16 +-
 .../test/ReplicatedLevelDBBrokerTest.java       | 160 ++++++++++++++-----
 4 files changed, 156 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/50f37beb/activemq-leveldb-store/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/pom.xml b/activemq-leveldb-store/pom.xml
index 9535b19..a2dd64b 100644
--- a/activemq-leveldb-store/pom.xml
+++ b/activemq-leveldb-store/pom.xml
@@ -147,6 +147,12 @@
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>test</scope>
+    </dependency>
+
     <!-- For Optional Snappy Compression -->
     <dependency>
       <groupId>org.xerial.snappy</groupId>

http://git-wip-us.apache.org/repos/asf/activemq/blob/50f37beb/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
index 1c6adec..dbf6512 100755
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
@@ -625,11 +625,15 @@ class LevelDBClient(store: LevelDBStore) {
     log = createLog
     log.logSize = store.logSize
     log.on_log_rotate = ()=> {
+      post_log_rotate
+    }
+  }
+
+  def post_log_rotate ={
       // We snapshot the index every time we rotate the logs.
       writeExecutor {
         snapshotIndex(false)
       }
-    }
   }
 
   def replay_init() = {
@@ -927,7 +931,16 @@ class LevelDBClient(store: LevelDBStore) {
     }
   }
 
-  var wal_append_position = 0L
+
+  var stored_wal_append_position = 0L
+
+  def wal_append_position = this.synchronized {
+    if (log!=null && log.isOpen) {
+      log.appender_limit
+    } else {
+      stored_wal_append_position
+    }
+  }
 
   def stop() = this.synchronized {
     if( writeExecutor!=null ) {
@@ -948,7 +961,7 @@ class LevelDBClient(store: LevelDBStore) {
         if (log!=null && log.isOpen) {
           log.close
           copyDirtyIndexToSnapshot
-          wal_append_position = log.appender_limit
+          stored_wal_append_position = log.appender_limit
           log = null
         }
         if( plist!=null ) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/50f37beb/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
index 4239a0b..f1a47f7 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.leveldb.replicated
 
-import org.apache.activemq.leveldb.LevelDBStore
+import org.apache.activemq.leveldb.{LevelDBClient, LevelDBStore}
 import org.apache.activemq.util.ServiceStopper
 import java.util
 import org.fusesource.hawtdispatch._
@@ -53,6 +53,16 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait
{
 
   var status = "initialized"
 
+  override def createClient = new LevelDBClient(this) {
+    // We don't want to start doing index snapshots until
+    // he slave is caught up.
+    override def post_log_rotate: Unit = {
+      if( caughtUp ) {
+        super.post_log_rotate
+      }
+    }
+  }
+
   override def doStart() = {
     client.init()
     if (purgeOnStatup) {
@@ -100,7 +110,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait
{
       // the stashed data might be the best option to become the master.
       stash(directory)
       delete_store(directory)
-      debug("Log replicaiton session connected")
+      debug("Log replication session connected")
       session.request_then(SYNC_ACTION, null) { body =>
         val response = JsonCodec.decode(body, classOf[SyncResponse])
         transfer_missing(response)
@@ -165,7 +175,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait
{
         command.action match {
           case WAL_ACTION =>
             val value = JsonCodec.decode(command.body, classOf[LogWrite])
-            if( caughtUp && value.offset ==0 ) {
+            if( caughtUp && value.offset ==0 && value.file!=0 ) {
               client.log.rotate
             }
             val file = client.log.next_log(value.file)

http://git-wip-us.apache.org/repos/asf/activemq/blob/50f37beb/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java
b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java
index d9e44b8..a8e743f 100644
--- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java
+++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java
@@ -21,17 +21,22 @@ import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import javax.jms.*;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
 import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.*;
 
 /**
  * Holds broker unit tests of the replicated leveldb store.
@@ -46,50 +51,109 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport
{
      * https://issues.apache.org/jira/browse/AMQ-4837
      */
     @Test(timeout = 1000*60*10)
-    public void testAMQ4837() throws Exception {
+    public void testAMQ4837viaJMS() throws Throwable {
+        testAMQ4837(false);
+    }
 
-        // 1.	Start 3 activemq nodes.
-        startBrokerAsync(createBrokerNode("node-1"));
-        startBrokerAsync(createBrokerNode("node-2"));
-        startBrokerAsync(createBrokerNode("node-3"));
+  /**
+     * Tries to replicate the problem reported at:
+     * https://issues.apache.org/jira/browse/AMQ-4837
+     */
+    @Test(timeout = 1000*60*10)
+    public void testAMQ4837viaJMX() throws Throwable {
+        for (int i = 0; i < 2; i++) {
+            resetDataDirs();
+            testAMQ4837(true);
+            stopBrokers();
+        }
+    }
 
-        // 2.	Push a message to the master and browse the queue
-        System.out.println("Wait for master to start up...");
-        BrokerService master = masterQueue.poll(60, TimeUnit.SECONDS);
-        assertNotNull("Master elected", master);
-        sendMessage(master, "Hello World #1");
-        assertEquals(1, browseMessages(master).size());
+    @Before
+    public void resetDataDirs() throws IOException {
+        deleteDirectory("node-1");
+        deleteDirectory("node-2");
+        deleteDirectory("node-3");
+    }
 
-        // 3.	Stop master node
-        System.out.println("Stopping master...");
-        master.stop();
-        master.waitUntilStopped();
-        BrokerService prevMaster = master;
+    protected void deleteDirectory(String s) throws IOException {
+        try {
+            FileUtils.deleteDirectory(new File(data_dir(), s));
+        } catch (IOException e) {
+        }
+    }
 
-        // 4.	Push a message to the new master (Node2) and browse the queue using the web
UI. Message summary and queue content ok.
-        System.out.println("Wait for new master to start up...");
-        master = masterQueue.poll(60, TimeUnit.SECONDS);
-        assertNotNull("Master elected", master);
-        sendMessage(master, "Hello World #2");
-        assertEquals(2, browseMessages(master).size());
 
-        // 5.	Start Node1
-        System.out.println("Starting previous master...");
-        prevMaster = createBrokerNode(prevMaster.getBrokerName());
-        startBrokerAsync(prevMaster);
+    public void testAMQ4837(boolean jmx) throws Throwable {
+
+        try {
+            System.out.println("======================================");
+            System.out.println("1.	Start 3 activemq nodes.");
+            System.out.println("======================================");
+            startBrokerAsync(createBrokerNode("node-1"));
+            startBrokerAsync(createBrokerNode("node-2"));
+            startBrokerAsync(createBrokerNode("node-3"));
+
+            BrokerService master = waitForNextMaster();
+            System.out.println("======================================");
+            System.out.println("2.	Push a message to the master and browse the queue");
+            System.out.println("======================================");
+            sendMessage(master, pad("Hello World #1", 1024));
+            assertEquals(1, browseMessages(master, jmx).size());
 
-        // 6.	Stop master node (Node2)
-        System.out.println("Stopping master...");
+            System.out.println("======================================");
+            System.out.println("3.	Stop master node");
+            System.out.println("======================================");
+            stop(master);
+            BrokerService prevMaster = master;
+            master = waitForNextMaster();
+
+            System.out.println("======================================");
+            System.out.println("4.	Push a message to the new master and browse the queue.
Message summary and queue content ok.");
+            System.out.println("======================================");
+            assertEquals(1, browseMessages(master, jmx).size());
+            sendMessage(master, pad("Hello World #2", 1024));
+            assertEquals(2, browseMessages(master, jmx).size());
+
+            System.out.println("======================================");
+            System.out.println("5.	Restart the stopped node & 6. stop current master");
+            System.out.println("======================================");
+            prevMaster = createBrokerNode(prevMaster.getBrokerName());
+            startBrokerAsync(prevMaster);
+            stop(master);
+
+            master = waitForNextMaster();
+            System.out.println("======================================");
+            System.out.println("7.	Browse the queue on new master");
+            System.out.println("======================================");
+            assertEquals(2, browseMessages(master, jmx).size());
+        } catch (Throwable e) {
+            e.printStackTrace();
+            throw e;
+        }
+
+    }
+
+    private void stop(BrokerService master) throws Exception {
+        System.out.println("Stopping "+master.getBrokerName());
         master.stop();
         master.waitUntilStopped();
+    }
 
-        // 7.	Browse the queue using the web UI on new master (Node3). Message summary ok
however when clicking on the queue, no message details.
-        // An error (see below) is logged by the master, which attempts a restart.
-        System.out.println("Wait for new master to start up...");
-        master = masterQueue.poll(60, TimeUnit.SECONDS);
+    private BrokerService waitForNextMaster() throws InterruptedException {
+        System.out.println("Wait for master to start up...");
+        BrokerService master = masterQueue.poll(60, TimeUnit.SECONDS);
         assertNotNull("Master elected", master);
-        assertEquals(2, browseMessages(master).size());
+        assertFalse(master.isSlave());
+        assertNull("Only one master elected at a time..", masterQueue.peek());
+        System.out.println("Master started: " + master.getBrokerName());
+        return master;
+    }
 
+    private String pad(String value, int size) {
+        while( value.length() < size ) {
+            value += " ";
+        }
+        return value;
     }
 
     private void startBrokerAsync(BrokerService b) {
@@ -121,8 +185,25 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport
{
             connection.close();
         }
     }
+    private ArrayList<String> browseMessages(BrokerService brokerService, boolean jmx)
throws Exception {
+        if( jmx ) {
+            return browseMessagesViaJMX(brokerService);
+        } else {
+            return browseMessagesViaJMS(brokerService);
+        }
+    }
+
+    private ArrayList<String> browseMessagesViaJMX(BrokerService brokerService) throws
Exception {
+        ArrayList<String> rc = new ArrayList<String>();
+        ObjectName on = new ObjectName("org.apache.activemq:type=Broker,brokerName="+brokerService.getBrokerName()+",destinationType=Queue,destinationName=FOO");
+        CompositeData[] browse = (CompositeData[]) ManagementFactory.getPlatformMBeanServer().invoke(on,
"browse", null, null);
+        for (CompositeData cd : browse) {
+            rc.add(cd.get("Text").toString()) ;
+        }
+        return rc;
+    }
 
-    private ArrayList<String> browseMessages(BrokerService brokerService) throws Exception
{
+    private ArrayList<String> browseMessagesViaJMS(BrokerService brokerService) throws
Exception {
         ArrayList<String> rc = new ArrayList<String>();
         TransportConnector connector = brokerService.getTransportConnectors().get(0);
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connector.getConnectUri());
@@ -143,14 +224,14 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport
{
     }
 
     @After
-    public void closeBrokers() throws Exception {
+    public void stopBrokers() throws Exception {
         for (BrokerService broker : brokers) {
             try {
-                broker.stop();
-                broker.waitUntilStopped();
+                stop(broker);
             } catch (Exception e) {
             }
         }
+        brokers.clear();
     }
 
     private BrokerService createBrokerNode(String id) throws Exception {
@@ -178,6 +259,7 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport
{
             }
         };
         store.setDirectory(new File(data_dir(), id));
+        store.setContainer(id);
         store.setReplicas(3);
         store.setZkAddress("localhost:" + connector.getLocalPort());
         store.setHostname("localhost");


Mime
View raw message