Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 79ACE10C64 for ; Thu, 5 Dec 2013 18:37:22 +0000 (UTC) Received: (qmail 23624 invoked by uid 500); 5 Dec 2013 18:37:22 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 23512 invoked by uid 500); 5 Dec 2013 18:37:22 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 23493 invoked by uid 99); 5 Dec 2013 18:37:21 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Dec 2013 18:37:21 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9094682C01C; Thu, 5 Dec 2013 18:37:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chirino@apache.org To: commits@activemq.apache.org Date: Thu, 05 Dec 2013 18:37:22 -0000 Message-Id: In-Reply-To: <180865941f5847be9fe8fe68ee5f9f49@git.apache.org> References: <180865941f5847be9fe8fe68ee5f9f49@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: Fixes https://issues.apache.org/jira/browse/AMQ-4923: Replicated LevelDB: Loss of broker Quorum fails to fully stop the master Fixes https://issues.apache.org/jira/browse/AMQ-4923: Replicated LevelDB: Loss of broker Quorum fails to fully stop the master Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ed8e4eae Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ed8e4eae Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ed8e4eae Branch: refs/heads/trunk Commit: ed8e4eae8f79b6f88562d3008292a1927b21786d Parents: 044c2d9 Author: Hiram Chirino Authored: Thu Dec 5 13:27:59 2013 -0500 Committer: Hiram Chirino Committed: Thu Dec 5 13:38:52 2013 -0500 ---------------------------------------------------------------------- .../apache/activemq/broker/BrokerService.java | 4 + .../org/apache/activemq/leveldb/DBManager.scala | 11 +- .../test/ReplicatedLevelDBBrokerTest.java | 191 ++++++++++++++++++- 3 files changed, 199 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/ed8e4eae/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 575e313..fe6f59b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -878,6 +878,10 @@ public class BrokerService implements Service { } } + public boolean isStopped() { + return stopped.get(); + } + /** * A helper method to block the caller thread until the broker has fully started * @return boolean true if wait succeeded false if broker was not started or was stopped http://git-wip-us.apache.org/repos/asf/activemq/blob/ed8e4eae/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index 722d932..cfcce78 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -35,6 +35,7 @@ import util.TimeMetric import scala.Some import org.apache.activemq.ActiveMQMessageAuditNoSync import org.fusesource.hawtdispatch +import org.apache.activemq.broker.SuppressReplyException case class EntryLocator(qid:Long, seq:Long) case class DataLocator(store:LevelDBStore, pos:Long, len:Int) { @@ -569,9 +570,6 @@ class DBManager(val parent:LevelDBStore) { def drainFlushes:Unit = { dispatchQueue.assertExecuting() - if( !started ) { - return - } // Some UOWs may have been canceled. import collection.JavaConversions._ @@ -590,7 +588,12 @@ class DBManager(val parent:LevelDBStore) { assert(action!=null) } } - Some(uow) + if( !started ) { + uow.onCompleted(new SuppressReplyException("Store stopped")) + None + } else { + Some(uow) + } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/ed8e4eae/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java index a8e743f..8910981 100644 --- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java +++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java @@ -22,6 +22,7 @@ import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import javax.jms.*; @@ -30,10 +31,17 @@ import javax.management.openmbean.CompositeData; import java.io.File; import java.io.IOException; import java.lang.management.ManagementFactory; +import java.net.ServerSocket; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Enumeration; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.io.FileUtils; import static org.junit.Assert.*; @@ -82,6 +90,165 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport { } } + public interface Client{ + public void execute(Connection connection) throws Exception; + } + + protected Thread startFailoverClient(String name, final Client client) throws IOException, URISyntaxException { + String url = "failover://(tcp://localhost:"+port+")?maxReconnectDelay=500&nested.wireFormat.maxInactivityDuration=1000"; + final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); + Thread rc = new Thread(name) { + @Override + public void run() { + Connection connection = null; + try { + connection = factory.createConnection(); + client.execute(connection); + } catch (Throwable e) { + e.printStackTrace(); + } finally { + try { + connection.close(); + } catch (JMSException e) { + } + } + } + }; + rc.start(); + return rc; + } + + @Test + @Ignore + public void testReplicationQuorumLoss() throws Throwable { + + System.out.println("======================================"); + System.out.println(" Start 2 ActiveMQ nodes."); + System.out.println("======================================"); + startBrokerAsync(createBrokerNode("node-1", port)); + startBrokerAsync(createBrokerNode("node-2", port)); + BrokerService master = waitForNextMaster(); + System.out.println("======================================"); + System.out.println(" Start the producer and consumer"); + System.out.println("======================================"); + + final AtomicBoolean stopClients = new AtomicBoolean(false); + final ArrayBlockingQueue errors = new ArrayBlockingQueue(100); + final AtomicLong receivedCounter = new AtomicLong(); + final AtomicLong sentCounter = new AtomicLong(); + Thread producer = startFailoverClient("producer", new Client() { + @Override + public void execute(Connection connection) throws Exception { + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue("test")); + long actual = 0; + while(!stopClients.get()) { + TextMessage msg = session.createTextMessage("Hello World"); + msg.setLongProperty("id", actual++); + producer.send(msg); + sentCounter.incrementAndGet(); + } + } + }); + + Thread consumer = startFailoverClient("consumer", new Client() { + @Override + public void execute(Connection connection) throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue("test")); + long expected = 0; + while(!stopClients.get()) { + Message msg = consumer.receive(200); + if( msg!=null ) { + long actual = msg.getLongProperty("id"); + if( actual != expected ) { + errors.offer("Received got unexpected msg id: "+actual+", expected: "+expected); + } + msg.acknowledge(); + expected = actual+1; + receivedCounter.incrementAndGet(); + } + } + } + }); + + try { + assertCounterMakesProgress(sentCounter, 10, TimeUnit.SECONDS); + assertCounterMakesProgress(receivedCounter, 5, TimeUnit.SECONDS); + assertNull(errors.poll()); + + System.out.println("======================================"); + System.out.println(" Master should stop once the quorum is lost."); + System.out.println("======================================"); + ArrayList stopped = stopSlaves();// stopping the slaves should kill the quorum. + assertStopsWithin(master, 10, TimeUnit.SECONDS); + assertNull(errors.poll()); // clients should not see an error since they are failover clients. + stopped.add(master); + + System.out.println("======================================"); + System.out.println(" Restart the slave. Clients should make progress again.."); + System.out.println("======================================"); + startBrokersAsync(createBrokerNodes(stopped)); + assertCounterMakesProgress(sentCounter, 10, TimeUnit.SECONDS); + assertCounterMakesProgress(receivedCounter, 5, TimeUnit.SECONDS); + assertNull(errors.poll()); + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } finally { + // Wait for the clients to stop.. + stopClients.set(true); + producer.join(); + consumer.join(); + } + } + + protected void startBrokersAsync(ArrayList brokers) { + for (BrokerService broker : brokers) { + startBrokerAsync(broker); + } + } + + protected ArrayList createBrokerNodes(ArrayList brokers) throws Exception { + ArrayList rc = new ArrayList(); + for (BrokerService b : brokers) { + rc.add(createBrokerNode(b.getBrokerName(), connectPort(b))); + } + return rc; + } + + protected ArrayList stopSlaves() throws Exception { + ArrayList rc = new ArrayList(); + for (BrokerService broker : brokers) { + if( broker.isSlave() ) { + System.out.println("Stopping slave: "+broker.getBrokerName()); + broker.stop(); + broker.waitUntilStopped(); + rc.add(broker); + } + } + brokers.removeAll(rc); + return rc; + } + + protected void assertStopsWithin(final BrokerService master, int timeout, TimeUnit unit) throws InterruptedException { + within(timeout, unit, new Task(){ + @Override + public void run() throws Exception { + assertTrue(master.isStopped()); + } + }); + } + + protected void assertCounterMakesProgress(final AtomicLong counter, int timeout, TimeUnit unit) throws InterruptedException { + final long initial = counter.get(); + within(timeout, unit, new Task(){ + public void run() throws Exception { + assertTrue(initial < counter.get()); + } + }); + } public void testAMQ4837(boolean jmx) throws Throwable { @@ -205,8 +372,7 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport { private ArrayList browseMessagesViaJMS(BrokerService brokerService) throws Exception { ArrayList rc = new ArrayList(); - TransportConnector connector = brokerService.getTransportConnectors().get(0); - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connector.getConnectUri()); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:"+ connectPort(brokerService)); Connection connection = factory.createConnection(); try { connection.start(); @@ -223,6 +389,19 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport { return rc; } + private int connectPort(BrokerService brokerService) throws IOException, URISyntaxException { + TransportConnector connector = brokerService.getTransportConnectors().get(0); + return connector.getConnectUri().getPort(); + } + + int port; + @Before + public void findFreePort() throws Exception { + ServerSocket socket = new ServerSocket(0); + port = socket.getLocalPort(); + socket.close(); + } + @After public void stopBrokers() throws Exception { for (BrokerService broker : brokers) { @@ -235,12 +414,18 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport { } private BrokerService createBrokerNode(String id) throws Exception { + return createBrokerNode(id, 0); + } + + private BrokerService createBrokerNode(String id, int port) throws Exception { BrokerService bs = new BrokerService(); bs.getManagementContext().setCreateConnector(false); brokers.add(bs); bs.setBrokerName(id); bs.setPersistenceAdapter(createStoreNode(id)); - bs.addConnector("tcp://0.0.0.0:0"); + TransportConnector connector = new TransportConnector(); + connector.setUri(new URI("tcp://0.0.0.0:" + port)); + bs.addConnector(connector); return bs; }