activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [11/15] activemq-6 git commit: Refactored the testsuite a bit
Date Fri, 06 Mar 2015 22:30:44 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/HierarchicalObjectRepositoryTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/HierarchicalObjectRepositoryTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/HierarchicalObjectRepositoryTest.java
new file mode 100644
index 0000000..e1b5b72
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/HierarchicalObjectRepositoryTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.core.settings.impl.HierarchicalObjectRepository;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+@BMRules(rules = { @BMRule(name = "modify map during iteration",
+         targetClass = "org.apache.activemq.core.settings.impl.HierarchicalObjectRepository",
+         targetMethod = "getPossibleMatches(String)", targetLocation = "AT INVOKE java.util.HashMap.put",
+         action = "org.apache.activemq.tests.extras.byteman.HierarchicalObjectRepositoryTest.bum()"), })
+public class HierarchicalObjectRepositoryTest
+{
+   private static final String A = "a.";
+   private static final int TOTAL = 100;
+   private static CountDownLatch latch;
+   private static CountDownLatch latch2;
+   private ExecutorService executor;
+   private HierarchicalObjectRepository<String> repo;
+
+   @Before
+   public void setUp()
+   {
+      latch = new CountDownLatch(1);
+      latch2 = new CountDownLatch(1);
+      executor = Executors.newSingleThreadExecutor();
+      repo = new HierarchicalObjectRepository<String>();
+      addToRepo(repo, A);
+   }
+
+   static void addToRepo(HierarchicalObjectRepository<String> repo0, String pattern)
+   {
+      for (int i = 0; i < TOTAL; i++)
+      {
+         repo0.addMatch(pattern + i + ".*", String.valueOf(i));
+      }
+   }
+
+   @After
+   public void tearDown() throws InterruptedException
+   {
+      latch.countDown();
+      latch2.countDown();
+      executor.shutdown();
+      executor.awaitTermination(1, TimeUnit.SECONDS);
+   }
+
+   private class Clearer implements Runnable
+   {
+      private final int code;
+
+      public Clearer(int code)
+      {
+         this.code = code;
+      }
+
+      @Override
+      public void run()
+      {
+         try
+         {
+            latch.await();
+         }
+         catch (InterruptedException e)
+         {
+            throw new RuntimeException(e);
+         }
+
+         switch (code)
+         {
+            case 0:
+               repo.clear();
+               break;
+            case 1:
+               addToRepo(repo, "bb.");
+               break;
+            case 2:
+               for (int i = TOTAL / 2; i < TOTAL; i++)
+               {
+                  repo.removeMatch(A + i + ".*");
+               }
+               break;
+            default:
+               throw new RuntimeException();
+         }
+
+         latch2.countDown();
+      }
+   }
+
+   @Test
+   public void testConcurrentModificationsClear()
+   {
+      executor.execute(new Clearer(0));
+      repo.getMatch(A + (TOTAL - 10) + ".foobar");
+      Assert.assertEquals("Byteman rule failed?", 0, latch.getCount());
+   }
+
+   @Test
+   public void testConcurrentModificationsAdd()
+   {
+      executor.execute(new Clearer(1));
+      repo.getMatch(A + (TOTAL - 10) + ".foobar");
+      Assert.assertEquals("Byteman rule failed?", 0, latch.getCount());
+   }
+
+   @Test
+   public void testConcurrentModificationsRemove()
+   {
+      executor.execute(new Clearer(2));
+      repo.getMatch(A + (TOTAL - 10) + ".foobar");
+      Assert.assertEquals("Byteman rule failed?", 0, latch.getCount());
+   }
+
+   public static void bum()
+   {
+      latch.countDown();
+      try
+      {
+         latch2.await(3, TimeUnit.SECONDS);
+      }
+      catch (InterruptedException e)
+      {
+         // no op
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/JMSBridgeReconnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/JMSBridgeReconnectionTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/JMSBridgeReconnectionTest.java
new file mode 100644
index 0000000..3be3f73
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/JMSBridgeReconnectionTest.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import org.apache.activemq.core.client.impl.ClientProducerCredits;
+import org.apache.activemq.core.message.impl.MessageInternal;
+import org.apache.activemq.core.protocol.core.Packet;
+import org.apache.activemq.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.apache.activemq.jms.bridge.ConnectionFactoryFactory;
+import org.apache.activemq.jms.bridge.QualityOfServiceMode;
+import org.apache.activemq.jms.bridge.impl.JMSBridgeImpl;
+import org.apache.activemq.jms.server.JMSServerManager;
+import org.apache.activemq.tests.extras.jms.bridge.BridgeTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(BMUnitRunner.class)
+public class JMSBridgeReconnectionTest extends BridgeTestBase
+{
+
+   @Test
+   @BMRules
+         (
+               rules =
+                     {
+                           @BMRule
+                                 (
+                                       name = "trace clientsessionimpl send",
+                                       targetClass = "org.apache.activemq.core.protocol.core.impl.ChannelImpl",
+                                       targetMethod = "send",
+                                       targetLocation = "ENTRY",
+                                       action = "org.apache.activemq.tests.extras.byteman.JMSBridgeReconnectionTest.pause($1);"
+                                 ),
+                           @BMRule
+                                 (
+                                       name = "trace sendRegularMessage",
+                                       targetClass = "org.apache.activemq.core.client.impl.ClientProducerImpl",
+                                       targetMethod = "sendRegularMessage",
+                                       targetLocation = "ENTRY",
+                                       action = "org.apache.activemq.tests.extras.byteman.JMSBridgeReconnectionTest.pause2($1,$2,$3);"
+                                 )
+                     }
+         )
+   public void performCrashDestinationStopBridge() throws Exception
+   {
+      activeMQServer = jmsServer1;
+      ConnectionFactoryFactory factInUse0 = cff0;
+      ConnectionFactoryFactory factInUse1 = cff1;
+      final JMSBridgeImpl bridge =
+            new JMSBridgeImpl(factInUse0,
+                  factInUse1,
+                  sourceQueueFactory,
+                  targetQueueFactory,
+                  null,
+                  null,
+                  null,
+                  null,
+                  null,
+                  1000,
+                  -1,
+                  QualityOfServiceMode.DUPLICATES_OK,
+                  10,
+                  -1,
+                  null,
+                  null,
+                  false);
+
+      addActiveMQComponent(bridge);
+      bridge.setTransactionManager(newTransactionManager());
+      bridge.start();
+      final CountDownLatch latch = new CountDownLatch(20);
+      Thread clientThread = new Thread(new Runnable()
+      {
+         @Override
+         public void run()
+         {
+            while (bridge.isStarted())
+            {
+               try
+               {
+                  sendMessages(cf0, sourceQueue, 0, 1, false, false);
+                  latch.countDown();
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace();
+               }
+            }
+         }
+      });
+
+      clientThread.start();
+
+      stopLatch.await(10000, TimeUnit.MILLISECONDS);
+
+      bridge.stop();
+
+      clientThread.join(5000);
+
+      assertTrue(!clientThread.isAlive());
+   }
+
+   public static void pause(Packet packet)
+   {
+      if (packet.getType() == PacketImpl.SESS_SEND)
+      {
+         SessionSendMessage sendMessage = (SessionSendMessage) packet;
+         if (sendMessage.getMessage().containsProperty("__HQ_CID") && count < 0 && !stopped)
+         {
+            try
+            {
+               activeMQServer.stop();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+            stopped = true;
+            try
+            {
+               Thread.sleep(5000);
+            }
+            catch (InterruptedException e)
+            {
+               e.printStackTrace();
+            }
+            stopLatch.countDown();
+         }
+      }
+   }
+
+   static JMSServerManager activeMQServer;
+   static boolean stopped = false;
+   static int count = 20;
+   static CountDownLatch stopLatch = new CountDownLatch(1);
+   public static void pause2(MessageInternal msgI, boolean sendBlocking, final ClientProducerCredits theCredits)
+   {
+      if (msgI.containsProperty("__HQ_CID"))
+      {
+         count--;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/LatencyTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/LatencyTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/LatencyTest.java
new file mode 100644
index 0000000..68914fe
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/LatencyTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import org.apache.activemq.api.core.client.ClientSession;
+import org.apache.activemq.api.core.client.ClientSessionFactory;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.tests.util.ServiceTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class LatencyTest extends ServiceTestBase
+{
+   /*
+   * simple test to make sure connect still works with some network latency  built into netty
+   * */
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "trace ClientBootstrap.connect",
+                     targetClass = "org.jboss.netty.bootstrap.ClientBootstrap",
+                     targetMethod = "connect",
+                     targetLocation = "ENTRY",
+                     action = "System.out.println(\"netty connecting\")"
+                  ),
+               @BMRule
+                  (
+                     name = "sleep OioWorker.run",
+                     targetClass = "org.jboss.netty.channel.socket.oio.OioWorker",
+                     targetMethod = "run",
+                     targetLocation = "ENTRY",
+                     action = "Thread.sleep(500)"
+                  )
+            }
+      )
+   public void testLatency() throws Exception
+   {
+      ActiveMQServer server = createServer(createDefaultConfig(true));
+      server.start();
+      ServerLocator locator = createNettyNonHALocator();
+      ClientSessionFactory factory = createSessionFactory(locator);
+      ClientSession session = factory.createSession();
+      session.close();
+      server.stop();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/MessageCopyTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/MessageCopyTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/MessageCopyTest.java
new file mode 100644
index 0000000..18b00d5
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/MessageCopyTest.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.api.core.ActiveMQBuffer;
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.apache.activemq.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.tests.util.RandomUtil;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Clebert Suconic
+ */
+@RunWith(BMUnitRunner.class)
+public class MessageCopyTest
+{
+   @Test
+   @BMRules
+      (
+
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "message-copy0",
+                     targetClass = "org.apache.activemq.core.server.impl.ServerMessageImpl",
+                     targetMethod = "copy()",
+                     targetLocation = "ENTRY",
+                     action = "System.out.println(\"copy\"), waitFor(\"encode-done\")"
+                  ),
+               @BMRule
+                  (
+                     name = "message-copy-done",
+                     targetClass = "org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendMessage",
+                     targetMethod = "encode(org.apache.activemq.spi.core.protocol.RemotingConnection)",
+                     targetLocation = "EXIT",
+                     action = "System.out.println(\"encodeDone\"), signalWake(\"encode-done\", true)"
+                  ),
+               @BMRule
+                  (
+                     name = "message-copy1",
+                     targetClass = "org.apache.activemq.core.buffers.impl.ChannelBufferWrapper",
+                     targetMethod = "copy(int, int)",
+                     condition = "Thread.currentThread().getName().equals(\"T1\")",
+                     targetLocation = "EXIT",
+                     action = "System.out.println(\"setIndex at \" + Thread.currentThread().getName()), waitFor(\"finish-read\")"
+                  ),
+               @BMRule(
+                  name = "JMSServer.stop wait-init",
+                  targetClass = "org.apache.activemq.tests.extras.byteman.MessageCopyTest",
+                  targetMethod = "simulateRead",
+                  targetLocation = "EXIT",
+                  action = "signalWake(\"finish-read\", true)"
+               )
+            }
+      )
+   public void testMessageCopyIssue() throws Exception
+   {
+      final long RUNS = 1;
+      final ServerMessageImpl msg = new ServerMessageImpl(123, 18);
+
+      msg.setMessageID(RandomUtil.randomLong());
+      msg.encodeMessageIDToBuffer();
+      msg.setAddress(new SimpleString("Batatantkashf aksjfh aksfjh askfdjh askjfh "));
+
+      final AtomicInteger errors = new AtomicInteger(0);
+
+      int T1_number = 1;
+      int T2_number = 1;
+
+      final CountDownLatch latchAlign = new CountDownLatch(T1_number + T2_number);
+      final CountDownLatch latchReady = new CountDownLatch(1);
+      class T1 extends Thread
+      {
+         T1()
+         {
+            super("T1");
+         }
+
+         @Override
+         public void run()
+         {
+            latchAlign.countDown();
+            try
+            {
+               latchReady.await();
+            }
+            catch (Exception ignored)
+            {
+            }
+
+            for (int i = 0; i < RUNS; i++)
+            {
+               try
+               {
+                  ServerMessageImpl newMsg = (ServerMessageImpl) msg.copy();
+               }
+               catch (Throwable e)
+               {
+                  e.printStackTrace();
+                  errors.incrementAndGet();
+               }
+            }
+         }
+      }
+
+      class T2 extends Thread
+      {
+         T2()
+         {
+            super("T2");
+         }
+
+         @Override
+         public void run()
+         {
+            latchAlign.countDown();
+            try
+            {
+               latchReady.await();
+            }
+            catch (Exception ignored)
+            {
+            }
+
+            for (int i = 0; i < RUNS; i++)
+            {
+               try
+               {
+                  SessionSendMessage ssm = new SessionSendMessage(msg);
+                  ActiveMQBuffer buf = ssm.encode(null);
+                  System.out.println("reading at buf = " + buf);
+                  simulateRead(buf);
+               }
+               catch (Throwable e)
+               {
+                  e.printStackTrace();
+                  errors.incrementAndGet();
+               }
+            }
+         }
+      }
+
+
+      ArrayList<Thread> threads = new ArrayList<Thread>();
+
+      for (int i = 0; i < T1_number; i++)
+      {
+         T1 t = new T1();
+         threads.add(t);
+         t.start();
+      }
+
+      for (int i = 0; i < T2_number; i++)
+      {
+         T2 t2 = new T2();
+         threads.add(t2);
+         t2.start();
+      }
+
+      latchAlign.await();
+
+      latchReady.countDown();
+
+      for (Thread t : threads)
+      {
+         t.join();
+      }
+
+      Assert.assertEquals(0, errors.get());
+   }
+
+   private void simulateRead(ActiveMQBuffer buf)
+   {
+      buf.setIndex(buf.capacity() / 2, buf.capacity() / 2);
+
+      // ok this is not actually happening during the read process, but changing this shouldn't affect the buffer on copy
+      buf.writeBytes(new byte[1024]);
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/OrphanedConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/OrphanedConsumerTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/OrphanedConsumerTest.java
new file mode 100644
index 0000000..6826704
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/OrphanedConsumerTest.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.client.ClientConsumer;
+import org.apache.activemq.api.core.client.ClientMessage;
+import org.apache.activemq.api.core.client.ClientProducer;
+import org.apache.activemq.api.core.client.ClientSession;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.core.client.impl.ClientSessionFactoryImpl;
+import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.tests.util.ServiceTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Clebert Suconic
+ */
+@RunWith(BMUnitRunner.class)
+public class OrphanedConsumerTest extends ServiceTestBase
+{
+
+   private static boolean conditionActive = true;
+
+   public static final boolean isConditionActive()
+   {
+      return conditionActive;
+   }
+
+
+   public static final void setConditionActive(boolean _conditionActive)
+   {
+      conditionActive = _conditionActive;
+   }
+
+
+   public static void throwException() throws Exception
+   {
+      throw new InterruptedException("nice.. I interrupted this!");
+   }
+
+   private ActiveMQServer server;
+
+   private ServerLocator locator;
+
+   static ActiveMQServer staticServer;
+
+   /**
+    * {@link #leavingCloseOnTestCountersWhileClosing()} will set this in case of any issues.
+    * the test must then validate for this being null
+    */
+   static AssertionError verification;
+
+   /**
+    * This static method is an entry point for the byteman rules on {@link #testOrphanedConsumers()}
+    * */
+   public static void leavingCloseOnTestCountersWhileClosing()
+   {
+      if (staticServer.getConnectionCount() == 0)
+      {
+         verification = new AssertionError("The connection was closed before the consumers and sessions, this may cause issues on management leaving Orphaned Consumers!");
+      }
+
+      if (staticServer.getSessions().size() == 0)
+      {
+         verification = new AssertionError("The session was closed before the consumers, this may cause issues on management leaving Orphaned Consumers!");
+      }
+   }
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      super.setUp();
+      setConditionActive(true);
+      /** I'm using the internal method here because closing
+       *  this locator on tear down would hang.
+       *  as we are tweaking with the internal state and making it fail */
+      locator = internalCreateNonHALocator(true);
+   }
+
+
+   @Override
+   @After
+   public void tearDown() throws Exception
+   {
+      super.tearDown();
+      setConditionActive(false);
+
+      staticServer = null;
+   }
+
+
+   /**
+    * This is like being two tests in one:
+    * I - validating that any exception during the close wouldn't stop connection from being closed
+    * II - validating that the connection is only removed at the end of the process and you wouldn't see
+    *      inconsistencies on management
+    * @throws Exception
+    */
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "closeExit",
+                     targetClass = "org.apache.activemq.core.server.impl.ServerConsumerImpl",
+                     targetMethod = "close",
+                     targetLocation = "AT EXIT",
+                     condition = "org.apache.activemq.tests.extras.byteman.OrphanedConsumerTest.isConditionActive()",
+                     action = "System.out.println(\"throwing stuff\");throw new InterruptedException()"
+                  ),
+               @BMRule
+                  (
+                     name = "closeEnter",
+                     targetClass = "org.apache.activemq.core.server.impl.ServerConsumerImpl",
+                     targetMethod = "close",
+                     targetLocation = "ENTRY",
+                     condition = "org.apache.activemq.tests.extras.byteman.OrphanedConsumerTest.isConditionActive()",
+                     action = "org.apache.activemq.tests.extras.byteman.OrphanedConsumerTest.leavingCloseOnTestCountersWhileClosing()"
+                  )
+
+            }
+      )
+   public void testOrphanedConsumers() throws Exception
+   {
+      internalTestOrphanedConsumers(false);
+   }
+
+
+   /**
+    * This is like being two tests in one:
+    * I - validating that any exception during the close wouldn't stop connection from being closed
+    * II - validating that the connection is only removed at the end of the process and you wouldn't see
+    *      inconsistencies on management
+    * @throws Exception
+    */
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "closeExit",
+                     targetClass = "org.apache.activemq.core.server.impl.ServerConsumerImpl",
+                     targetMethod = "close",
+                     targetLocation = "AT EXIT",
+                     condition = "org.apache.activemq.tests.extras.byteman.OrphanedConsumerTest.isConditionActive()",
+                     action = "System.out.println(\"throwing stuff\");throw new InterruptedException()"
+                  ),
+               @BMRule
+                  (
+                     name = "closeEnter",
+                     targetClass = "org.apache.activemq.core.server.impl.ServerConsumerImpl",
+                     targetMethod = "close",
+                     targetLocation = "ENTRY",
+                     condition = "org.apache.activemq.tests.extras.byteman.OrphanedConsumerTest.isConditionActive()",
+                     action = "org.apache.activemq.tests.extras.byteman.OrphanedConsumerTest.leavingCloseOnTestCountersWhileClosing()"
+                  )
+
+            }
+      )
+   public void testOrphanedConsumersByManagement() throws Exception
+   {
+      internalTestOrphanedConsumers(true);
+   }
+
+   /**
+    *
+    * @param useManagement true = it will use a management operation to make the connection failure, false through ping
+    * @throws Exception
+    */
+   private void internalTestOrphanedConsumers(boolean useManagement) throws Exception
+   {
+      final int NUMBER_OF_MESSAGES = 2;
+      server = createServer(true, true);
+      server.start();
+      staticServer = server;
+
+      locator.setBlockOnNonDurableSend(false);
+      locator.setBlockOnDurableSend(false);
+      locator.setBlockOnAcknowledge(true);
+      locator.setConnectionTTL(1000);
+      locator.setClientFailureCheckPeriod(100);
+      locator.setReconnectAttempts(0);
+      // We are not interested on consumer-window-size on this test
+      // We want that every message is delivered
+      // as we asserting for number of consumers available and round-robin on delivery
+      locator.setConsumerWindowSize(-1);
+
+      ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl)createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(true, true, 0);
+
+      session.createQueue("queue", "queue1", true);
+      session.createQueue("queue", "queue2", true);
+
+      ClientProducer prod = session.createProducer("queue");
+
+      ClientConsumer consumer = session.createConsumer("queue1");
+      ClientConsumer consumer2 = session.createConsumer("queue2");
+
+
+      Queue queue1 = server.locateQueue(new SimpleString("queue1"));
+
+      Queue queue2 = server.locateQueue(new SimpleString("queue2"));
+
+      session.start();
+
+
+      if (!useManagement)
+      {
+         sf.stopPingingAfterOne();
+
+         for (long timeout = System.currentTimeMillis() + 6000; timeout > System.currentTimeMillis() && server.getConnectionCount() != 0; )
+         {
+            Thread.sleep(100);
+         }
+
+         // an extra second to avoid races of something closing the session while we are asserting it
+         Thread.sleep(1000);
+      }
+      else
+      {
+         server.getActiveMQServerControl().closeConnectionsForAddress("127.0.0.1");
+      }
+
+      if (verification != null)
+      {
+         throw verification;
+      }
+
+      assertEquals(0, queue1.getConsumerCount());
+      assertEquals(0, queue2.getConsumerCount());
+
+      setConditionActive(false);
+
+      locator = internalCreateNonHALocator(true);
+
+      locator.setBlockOnNonDurableSend(false);
+      locator.setBlockOnDurableSend(false);
+      locator.setBlockOnAcknowledge(true);
+      locator.setReconnectAttempts(0);
+      locator.setConsumerWindowSize(-1);
+
+      sf = (ClientSessionFactoryImpl)locator.createSessionFactory();
+      session = sf.createSession(true, true, 0);
+
+
+      session.start();
+
+
+      prod = session.createProducer("queue");
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+      {
+         ClientMessage message = session.createMessage(true);
+         message.putIntProperty("i", i);
+         prod.send(message);
+      }
+
+      consumer = session.createConsumer("queue1");
+      consumer2 = session.createConsumer("queue2");
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+      {
+         assertNotNull(consumer.receive(5000));
+         assertNotNull(consumer2.receive(5000));
+      }
+
+      session.close();
+   }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/PagingLeakTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/PagingLeakTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/PagingLeakTest.java
new file mode 100644
index 0000000..2da8445
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/PagingLeakTest.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.api.core.client.ClientConsumer;
+import org.apache.activemq.api.core.client.ClientMessage;
+import org.apache.activemq.api.core.client.ClientProducer;
+import org.apache.activemq.api.core.client.ClientSession;
+import org.apache.activemq.api.core.client.ClientSessionFactory;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.core.config.Configuration;
+import org.apache.activemq.core.paging.cursor.impl.PagePositionImpl;
+import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.core.server.ActiveMQServers;
+import org.apache.activemq.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.core.settings.impl.AddressSettings;
+import org.apache.activemq.tests.util.ServiceTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class PagingLeakTest extends ServiceTestBase
+{
+
+   private static final AtomicInteger pagePosInstances = new AtomicInteger(0);
+
+   public static void newPosition()
+   {
+      pagePosInstances.incrementAndGet();
+   }
+
+   public static void deletePosition()
+   {
+      pagePosInstances.decrementAndGet();
+   }
+
+   @Before
+   public void setup()
+   {
+      pagePosInstances.set(0);
+   }
+
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "newPosition",
+                     targetClass = "org.apache.activemq.core.paging.cursor.impl.PagePositionImpl",
+                     targetMethod = "<init>()",
+                     targetLocation = "ENTRY",
+                     action = "org.apache.activemq.tests.extras.byteman.PagingLeakTest.newPosition()"
+                  ),
+               @BMRule
+                  (
+                     name = "finalPosition",
+                     targetClass = "org.apache.activemq.core.paging.cursor.impl.PagePositionImpl",
+                     targetMethod = "finalize",
+                     targetLocation = "ENTRY",
+                     action = "org.apache.activemq.tests.extras.byteman.PagingLeakTest.deletePosition()"
+                  )
+            }
+      )
+   public void testValidateLeak() throws Throwable
+   {
+
+      List<PagePositionImpl> positions = new ArrayList<PagePositionImpl>();
+
+      for (int i = 0; i < 300; i++)
+      {
+         positions.add(new PagePositionImpl(3, 3));
+      }
+
+      long timeout = System.currentTimeMillis() + 5000;
+      while (pagePosInstances.get() != 300 && timeout > System.currentTimeMillis())
+      {
+         forceGC();
+      }
+
+      // This is just to validate the rules are correctly applied on byteman
+      assertEquals("You have changed something on PagePositionImpl in such way that these byteman rules are no longer working", 300, pagePosInstances.get());
+
+      positions.clear();
+
+      timeout = System.currentTimeMillis() + 5000;
+      while (pagePosInstances.get() != 0 && timeout > System.currentTimeMillis())
+      {
+         forceGC();
+      }
+
+      // This is just to validate the rules are correctly applied on byteman
+      assertEquals("You have changed something on PagePositionImpl in such way that these byteman rules are no longer working", 0, pagePosInstances.get());
+
+      final ArrayList<Exception> errors = new ArrayList<Exception>();
+      // A backup that will be waiting to be activated
+      Configuration conf = createDefaultConfig(true)
+         .setSecurityEnabled(false)
+         .addConnectorConfiguration("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+      final ActiveMQServer server = ActiveMQServers.newActiveMQServer(conf, true);
+      addServer(server);
+
+
+      server.start();
+
+
+      AddressSettings settings = new AddressSettings();
+      settings.setPageSizeBytes(20 * 1024);
+      settings.setMaxSizeBytes(200 * 1024);
+      settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+
+
+      server.getAddressSettingsRepository().addMatch("#", settings);
+
+
+      final SimpleString address = new SimpleString("pgdAddress");
+
+      class Consumer extends Thread
+      {
+         final ServerLocator locator;
+         final ClientSessionFactory sf;
+         final ClientSession session;
+         final ClientConsumer consumer;
+
+         final int sleepTime;
+         final int maxConsumed;
+
+         Consumer(int sleepTime, String suffix, int maxConsumed) throws Exception
+         {
+
+            server.createQueue(address, address.concat(suffix), null, true, false);
+
+            this.sleepTime = sleepTime;
+            locator = createInVMLocator(0);
+            sf = locator.createSessionFactory();
+            session = sf.createSession(true, true);
+            consumer = session.createConsumer(address.concat(suffix));
+
+            this.maxConsumed = maxConsumed;
+         }
+
+         public void run()
+         {
+            try
+            {
+               session.start();
+
+               long lastTime = System.currentTimeMillis();
+
+               for (long i = 0; i < maxConsumed; i++)
+               {
+                  ClientMessage msg = consumer.receive(5000);
+
+                  if (msg == null)
+                  {
+                     errors.add(new Exception("didn't receive a message"));
+                     return;
+                  }
+
+                  msg.acknowledge();
+
+
+                  if (sleepTime > 0)
+                  {
+
+                     Thread.sleep(sleepTime);
+                  }
+
+                  if (i % 1000 == 0)
+                  {
+                     System.out.println("Consumed " + i + " events in " + (System.currentTimeMillis() - lastTime));
+                     lastTime = System.currentTimeMillis();
+                  }
+               }
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+
+
+      int numberOfMessages = 10000;
+
+      Consumer consumer1 = new Consumer(100, "-1", 150);
+      Consumer consumer2 = new Consumer(0, "-2", numberOfMessages);
+
+      final ServerLocator locator = createInVMLocator(0);
+      final ClientSessionFactory sf = locator.createSessionFactory();
+      final ClientSession session = sf.createSession(true, true);
+      final ClientProducer producer = session.createProducer(address);
+
+
+      byte[] b = new byte[1024];
+
+
+      for (long i = 0; i < numberOfMessages; i++)
+      {
+         ClientMessage msg = session.createMessage(true);
+         msg.getBodyBuffer().writeBytes(b);
+         producer.send(msg);
+
+         if (i == 1000)
+         {
+            System.out.println("Starting consumers!!!");
+            consumer1.start();
+            consumer2.start();
+         }
+
+         if (i % 1000 == 0)
+         {
+            validateInstances();
+         }
+
+      }
+
+
+      consumer1.join();
+      consumer2.join();
+
+      validateInstances();
+      Throwable elast = null;
+
+      for (Throwable e : errors)
+      {
+         e.printStackTrace();
+         elast = e;
+      }
+
+      if (elast != null)
+      {
+         throw elast;
+      }
+
+   }
+
+   private void validateInstances()
+   {
+      forceGC();
+      int count2 = pagePosInstances.get();
+      Assert.assertTrue("There is a leak, you shouldn't have this many instances (" + count2 + ")", count2 < 5000);
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ReplicationBackupTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ReplicationBackupTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ReplicationBackupTest.java
new file mode 100644
index 0000000..15c987c
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ReplicationBackupTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.activemq.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.core.config.Configuration;
+import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.tests.util.ReplicatedBackupUtils;
+import org.apache.activemq.tests.util.ServiceTestBase;
+import org.apache.activemq.tests.util.TransportConfigurationUtils;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class ReplicationBackupTest extends ServiceTestBase
+{
+   private static final CountDownLatch ruleFired = new CountDownLatch(1);
+   private ActiveMQServer backupServer;
+   private ActiveMQServer liveServer;
+
+   /*
+   * simple test to induce a potential race condition where the server's acceptors are active, but the server's
+   * state != STARTED
+   */
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "prevent backup annoucement",
+                     targetClass = "org.apache.activemq.core.server.impl.SharedNothingLiveActivation",
+                     targetMethod = "run",
+                     targetLocation = "AT EXIT",
+                     action = "org.apache.activemq.tests.extras.byteman.ReplicationBackupTest.breakIt();"
+                  )
+            }
+      )
+   public void testReplicatedBackupAnnouncement() throws Exception
+   {
+      TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
+      TransportConfiguration liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0);
+      TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0);
+      TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);
+
+      final String suffix = "_backup";
+
+      Configuration backupConfig = createDefaultConfig()
+         .setBindingsDirectory(ActiveMQDefaultConfiguration.getDefaultBindingsDirectory() + suffix)
+         .setJournalDirectory(ActiveMQDefaultConfiguration.getDefaultJournalDir() + suffix)
+         .setPagingDirectory(ActiveMQDefaultConfiguration.getDefaultPagingDir() + suffix)
+         .setLargeMessagesDirectory(ActiveMQDefaultConfiguration.getDefaultLargeMessagesDir() + suffix);
+
+      Configuration liveConfig = createDefaultConfig();
+
+      ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor);
+
+      liveServer = createServer(liveConfig);
+
+      // start the live server in a new thread so we can start the backup simultaneously to induce a potential race
+      Thread startThread = new Thread(new Runnable()
+      {
+         @Override
+         public void run()
+         {
+            try
+            {
+               liveServer.start();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      });
+      startThread.start();
+
+      ruleFired.await();
+
+      backupServer = createServer(backupConfig);
+      backupServer.start();
+      ServiceTestBase.waitForRemoteBackup(null, 3, true, backupServer);
+   }
+
+   public static void breakIt()
+   {
+      ruleFired.countDown();
+      try
+      {
+         /* before the fix this sleep would put the "live" server into a state where the acceptors were started
+          * but the server's state != STARTED which would cause the backup to fail to announce
+          */
+         Thread.sleep(2000);
+      }
+      catch (InterruptedException e)
+      {
+         e.printStackTrace();
+      }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ScaleDownFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ScaleDownFailoverTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ScaleDownFailoverTest.java
new file mode 100644
index 0000000..d06b093
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ScaleDownFailoverTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.api.core.client.ClientMessage;
+import org.apache.activemq.core.config.ScaleDownConfiguration;
+import org.apache.activemq.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.tests.integration.cluster.distribution.ClusterTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class ScaleDownFailoverTest extends ClusterTestBase
+{
+   protected static int stopCount = 0;
+   private static ActiveMQServer[] staticServers;
+
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      super.setUp();
+      stopCount = 0;
+      setupLiveServer(0, isFileStorage(), false, isNetty(), true);
+      setupLiveServer(1, isFileStorage(), false, isNetty(), true);
+      setupLiveServer(2, isFileStorage(), false, isNetty(), true);
+      ScaleDownConfiguration scaleDownConfiguration = new ScaleDownConfiguration();
+      ScaleDownConfiguration scaleDownConfiguration2 = new ScaleDownConfiguration();
+      scaleDownConfiguration2.setEnabled(false);
+      ScaleDownConfiguration scaleDownConfiguration3 = new ScaleDownConfiguration();
+      scaleDownConfiguration3.setEnabled(false);
+      ((LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration);
+      ((LiveOnlyPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration2);
+      ((LiveOnlyPolicyConfiguration) servers[2].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration3);
+      if (isGrouped())
+      {
+         scaleDownConfiguration.setGroupName("bill");
+         scaleDownConfiguration2.setGroupName("bill");
+         scaleDownConfiguration3.setGroupName("bill");
+      }
+      staticServers = servers;
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+      scaleDownConfiguration.getConnectors().addAll(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
+      scaleDownConfiguration2.getConnectors().addAll(servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
+      scaleDownConfiguration3.getConnectors().addAll(servers[2].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
+
+      startServers(0, 1, 2);
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+   }
+
+   protected boolean isNetty()
+   {
+      return true;
+   }
+
+   protected boolean isGrouped()
+   {
+      return false;
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception
+   {
+      closeAllConsumers();
+      closeAllSessionFactories();
+      closeAllServerLocatorsFactories();
+      stopServers(0, 1, 2);
+      super.tearDown();
+   }
+
+
+   @Test
+   @BMRule
+      (
+         name = "blow-up",
+         targetClass = "org.apache.activemq.api.core.client.ServerLocator",
+         targetMethod = "createSessionFactory(org.apache.activemq.api.core.TransportConfiguration, int, boolean)",
+         isInterface = true,
+         targetLocation = "ENTRY",
+         action = "org.apache.activemq.tests.extras.byteman.ScaleDownFailoverTest.fail($1);"
+      )
+   public void testScaleDownWhenFirstServerFails() throws Exception
+   {
+      final int TEST_SIZE = 2;
+      final String addressName = "testAddress";
+      final String queueName1 = "testQueue1";
+      final String queueName2 = "testQueue2";
+
+      // create 2 queues on each node mapped to the same address
+      createQueue(0, addressName, queueName1, null, false);
+      createQueue(0, addressName, queueName2, null, false);
+      createQueue(1, addressName, queueName1, null, false);
+      createQueue(1, addressName, queueName2, null, false);
+      createQueue(2, addressName, queueName1, null, false);
+      createQueue(2, addressName, queueName2, null, false);
+
+      // send messages to node 0
+      send(0, addressName, TEST_SIZE, false, null);
+
+      // consume a message from node 0
+      addConsumer(0, 0, queueName2, null);
+      ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
+      Assert.assertNotNull(clientMessage);
+      clientMessage.acknowledge();
+      removeConsumer(0);
+
+      servers[0].stop();
+
+      // verify that at least one server stopped
+      Assert.assertTrue(!servers[1].isStarted() || !servers[2].isStarted());
+
+      int remainingServer;
+      if (servers[1].isStarted())
+         remainingServer = 1;
+      else
+         remainingServer = 2;
+
+      // get the 2 messages from queue 1
+      addConsumer(0, remainingServer, queueName1, null);
+      clientMessage = consumers[0].getConsumer().receive(250);
+      Assert.assertNotNull(clientMessage);
+      clientMessage.acknowledge();
+      clientMessage = consumers[0].getConsumer().receive(250);
+      Assert.assertNotNull(clientMessage);
+      clientMessage.acknowledge();
+
+      // ensure there are no more messages on queue 1
+      clientMessage = consumers[0].getConsumer().receive(250);
+      Assert.assertNull(clientMessage);
+      removeConsumer(0);
+
+      // get the 1 message from queue 2
+      addConsumer(0, remainingServer, queueName2, null);
+      clientMessage = consumers[0].getConsumer().receive(250);
+      Assert.assertNotNull(clientMessage);
+      clientMessage.acknowledge();
+
+      // ensure there are no more messages on queue 1
+      clientMessage = consumers[0].getConsumer().receive(250);
+      Assert.assertNull(clientMessage);
+      removeConsumer(0);
+   }
+
+   public static void fail(TransportConfiguration tc)
+   {
+      // only kill one server
+      if (stopCount == 0)
+      {
+         try
+         {
+            for (ActiveMQServer activeMQServer : staticServers)
+            {
+               if (activeMQServer != null)
+               {
+                  for (TransportConfiguration transportConfiguration : activeMQServer.getConfiguration().getAcceptorConfigurations())
+                  {
+                     if (transportConfiguration.getParams().get(TransportConstants.PORT_PROP_NAME).equals(tc.getParams().get(TransportConstants.PORT_PROP_NAME)))
+                     {
+                        activeMQServer.stop();
+                        stopCount++;
+                        System.out.println("Stopping server listening at: " + tc.getParams().get(TransportConstants.PORT_PROP_NAME));
+                     }
+                  }
+               }
+            }
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ScaleDownFailureTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ScaleDownFailureTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ScaleDownFailureTest.java
new file mode 100644
index 0000000..fbf46f1
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ScaleDownFailureTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import org.apache.activemq.api.core.client.ClientMessage;
+import org.apache.activemq.core.config.ScaleDownConfiguration;
+import org.apache.activemq.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.tests.integration.cluster.distribution.ClusterTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class ScaleDownFailureTest extends ClusterTestBase
+{
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      super.setUp();
+      setupLiveServer(0, isFileStorage(), false, isNetty(), true);
+      setupLiveServer(1, isFileStorage(), false, isNetty(), true);
+      if (isGrouped())
+      {
+         ScaleDownConfiguration scaleDownConfiguration = new ScaleDownConfiguration();
+         scaleDownConfiguration.setGroupName("bill");
+         ((LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration);
+         ((LiveOnlyPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration);
+      }
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
+      startServers(0, 1);
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+   }
+
+   protected boolean isNetty()
+   {
+      return true;
+   }
+
+   protected boolean isGrouped()
+   {
+      return false;
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception
+   {
+      closeAllConsumers();
+      closeAllSessionFactories();
+      closeAllServerLocatorsFactories();
+      ((LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(null);
+      stopServers(0, 1);
+      super.tearDown();
+   }
+
+   @Test
+   @BMRule
+      (
+         name = "blow-up",
+         targetClass = "org.apache.activemq.api.core.client.ServerLocator",
+         targetMethod = "createSessionFactory(org.apache.activemq.api.core.TransportConfiguration, int, boolean)",
+         isInterface = true,
+         targetLocation = "ENTRY",
+         action = "throw new Exception()"
+      )
+   public void testScaleDownWhenRemoteServerIsUnavailable() throws Exception
+   {
+      final int TEST_SIZE = 1;
+      final String addressName = "testAddress";
+      final String queueName1 = "testQueue1";
+      final String queueName2 = "testQueue2";
+
+      // create 2 queues on each node mapped to the same address
+      createQueue(0, addressName, queueName1, null, false);
+      createQueue(0, addressName, queueName2, null, false);
+      createQueue(1, addressName, queueName1, null, false);
+      createQueue(1, addressName, queueName2, null, false);
+
+      // send messages to node 0
+      send(0, addressName, TEST_SIZE, false, null);
+
+      // consume a message from node 0
+      addConsumer(0, 0, queueName2, null, false);
+      ClientMessage clientMessage = consumers[0].getConsumer().receive();
+      Assert.assertNotNull(clientMessage);
+      clientMessage.acknowledge();
+      consumers[0].getSession().commit();
+      removeConsumer(0);
+
+      servers[0].stop();
+
+      addConsumer(0, 1, queueName1, null);
+      clientMessage = consumers[0].getConsumer().receive(250);
+      Assert.assertNull(clientMessage);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ScaleDownGroupedFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ScaleDownGroupedFailoverTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ScaleDownGroupedFailoverTest.java
new file mode 100644
index 0000000..e86d4d6
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ScaleDownGroupedFailoverTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+public class ScaleDownGroupedFailoverTest extends ScaleDownFailoverTest
+{
+   protected boolean isGrouped()
+   {
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ScaleDownGroupedFailureTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ScaleDownGroupedFailureTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ScaleDownGroupedFailureTest.java
new file mode 100644
index 0000000..87dd577
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ScaleDownGroupedFailureTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+public class ScaleDownGroupedFailureTest extends ScaleDownFailureTest
+{
+   protected boolean isGrouped()
+   {
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/StartStopDeadlockTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/StartStopDeadlockTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/StartStopDeadlockTest.java
new file mode 100644
index 0000000..fa8dfdc
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/StartStopDeadlockTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.core.config.Configuration;
+import org.apache.activemq.core.config.ha.SharedStoreMasterPolicyConfiguration;
+import org.apache.activemq.core.config.ha.SharedStoreSlavePolicyConfiguration;
+import org.apache.activemq.core.registry.JndiBindingRegistry;
+import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.core.server.ActiveMQServers;
+import org.apache.activemq.jms.server.impl.JMSServerManagerImpl;
+import org.apache.activemq.tests.unit.util.InVMNamingContext;
+import org.apache.activemq.tests.util.ServiceTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/**
+ * This test validates a deadlock identified by https://bugzilla.redhat.com/show_bug.cgi?id=959616
+ *
+ * @author Clebert
+ */
+@RunWith(BMUnitRunner.class)
+public class StartStopDeadlockTest extends ServiceTestBase
+{
+   /*
+   * simple test to make sure connect still works with some network latency  built into netty
+   * */
+   @Test
+   @BMRules
+      (
+
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "Server.start wait-init",
+                     targetClass = "org.apache.activemq.core.server.impl.ActiveMQServerImpl",
+                     targetMethod = "initialisePart2",
+                     targetLocation = "ENTRY",
+                     condition = "incrementCounter(\"server-Init\") == 2",
+                     action = "System.out.println(\"server backup init\"), waitFor(\"start-init\")"
+                  ),
+               @BMRule(
+                  name = "JMSServer.stop wait-init",
+                  targetClass = "org.apache.activemq.jms.server.impl.JMSServerManagerImpl",
+                  targetMethod = "stop",
+                  targetLocation = "ENTRY",
+                  action = "signalWake(\"start-init\", true)"
+               ),
+               @BMRule(
+                  name = "StartStopDeadlockTest tearDown",
+                  targetClass = "org.apache.activemq.tests.extras.byteman.StartStopDeadlockTest",
+                  targetMethod = "tearDown",
+                  targetLocation = "ENTRY",
+                  action = "deleteCounter(\"server-Init\")"
+               )
+            }
+      )
+   public void testDeadlock() throws Exception
+   {
+
+      // A live server that will always be crashed
+      Configuration confLive = createDefaultConfig(true)
+         .setSecurityEnabled(false)
+         .setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration())
+         .addConnectorConfiguration("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      final ActiveMQServer serverLive = ActiveMQServers.newActiveMQServer(confLive);
+      serverLive.start();
+      addServer(serverLive);
+
+
+      // A backup that will be waiting to be activated
+      Configuration conf = createDefaultConfig(true)
+         .setSecurityEnabled(false)
+         .setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration())
+         .addConnectorConfiguration("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+      final ActiveMQServer server = ActiveMQServers.newActiveMQServer(conf, true);
+      addServer(server);
+
+      final JMSServerManagerImpl jmsServer = new JMSServerManagerImpl(server);
+      final InVMNamingContext context = new InVMNamingContext();
+      jmsServer.setRegistry(new JndiBindingRegistry(context));
+
+      jmsServer.start();
+
+      final AtomicInteger errors = new AtomicInteger(0);
+      final CountDownLatch align = new CountDownLatch(2);
+      final CountDownLatch startLatch = new CountDownLatch(1);
+
+
+      Thread tCrasher = new Thread("tStart")
+      {
+         @Override
+         public void run()
+         {
+            try
+            {
+               align.countDown();
+               startLatch.await();
+               System.out.println("Crashing....");
+               serverLive.stop(true);
+            }
+            catch (Exception e)
+            {
+               errors.incrementAndGet();
+               e.printStackTrace();
+            }
+         }
+      };
+
+      Thread tStop = new Thread("tStop")
+      {
+         @Override
+         public void run()
+         {
+            try
+            {
+               align.countDown();
+               startLatch.await();
+               jmsServer.stop();
+            }
+            catch (Exception e)
+            {
+               errors.incrementAndGet();
+               e.printStackTrace();
+            }
+         }
+      };
+
+      tCrasher.start();
+      tStop.start();
+      align.await();
+      startLatch.countDown();
+
+      tCrasher.join();
+      tStop.join();
+
+      assertEquals(0, errors.get());
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/StompInternalStateTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/StompInternalStateTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/StompInternalStateTest.java
new file mode 100644
index 0000000..12037f4
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/StompInternalStateTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.api.core.client.ClientSession;
+import org.apache.activemq.api.core.client.ClientSessionFactory;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.api.core.management.CoreNotificationType;
+import org.apache.activemq.core.config.Configuration;
+import org.apache.activemq.core.protocol.stomp.StompProtocolManagerFactory;
+import org.apache.activemq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.apache.activemq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.apache.activemq.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.core.server.management.Notification;
+import org.apache.activemq.tests.util.ServiceTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class StompInternalStateTest extends ServiceTestBase
+{
+   private static final String STOMP_QUEUE_NAME = "jms.queue.StompTestQueue";
+
+   private static volatile String resultTestStompProtocolManagerLeak = null;
+
+   protected ActiveMQServer server = null;
+
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "StompProtocolManager Leak Server Rule",
+                     targetClass = "org.apache.activemq.core.protocol.stomp.StompProtocolManager",
+                     targetMethod = "onNotification(org.apache.activemq.core.server.management.Notification)",
+                     targetLocation = "EXIT",
+                     helper = "org.apache.activemq.tests.extras.byteman.StompInternalStateTest",
+                     action = "verifyBindingAddRemove($1, $0.destinations)"
+                  )
+            }
+      )
+   public void testStompProtocolManagerLeak() throws Exception
+   {
+      ClientSession session = null;
+      try
+      {
+         assertNull(resultTestStompProtocolManagerLeak);
+         ServerLocator locator = createNettyNonHALocator();
+         ClientSessionFactory factory = createSessionFactory(locator);
+         session = factory.createSession();
+         session.createTemporaryQueue(STOMP_QUEUE_NAME, STOMP_QUEUE_NAME);
+         session.deleteQueue(STOMP_QUEUE_NAME);
+
+         assertNull(resultTestStompProtocolManagerLeak);
+      }
+      finally
+      {
+         if (session != null)
+         {
+            session.close();
+         }
+      }
+   }
+
+   @Override
+   protected Configuration createDefaultConfig(final boolean netty) throws Exception
+   {
+      Configuration config = super.createDefaultConfig(netty)
+         .setSecurityEnabled(false)
+         .setPersistenceEnabled(false);
+
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME);
+      params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+      params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
+      TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+      config.getAcceptorConfigurations().add(stompTransport);
+      config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+
+      return config;
+   }
+
+   @SuppressWarnings("unchecked")
+   public void verifyBindingAddRemove(Notification noti, Object obj)
+   {
+      Set<String> destinations = (Set<String>)obj;
+      if (noti.getType() == CoreNotificationType.BINDING_ADDED)
+      {
+         if (!destinations.contains(STOMP_QUEUE_NAME))
+         {
+            resultTestStompProtocolManagerLeak += "didn't save the queue when binding added " + destinations;
+         }
+      }
+      else if (noti.getType() == CoreNotificationType.BINDING_REMOVED)
+      {
+         if (destinations.contains(STOMP_QUEUE_NAME))
+         {
+            resultTestStompProtocolManagerLeak = "didn't remove the queue when binding removed " + destinations;
+         }
+      }
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      super.setUp();
+      server = createServer(createDefaultConfig(true));
+      server.start();
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception
+   {
+      server.stop();
+      super.tearDown();
+   }
+}


Mime
View raw message