activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [14/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:00:44 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f34dcb4..5576fc4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,6 +7,7 @@
    <version>2.5.0-SNAPSHOT</version>
    <modules>
       <module>hornetq-protocols</module>
+     <module>hornetq-dto</module>
    </modules>
 
    <name>HornetQ Parent</name>
@@ -45,7 +46,8 @@
          ${hornetq.version.majorVersion}.${hornetq.version.minorVersion}.${hornetq.version.microVersion}.${hornetq.version.versionSuffix}
          (${hornetq.version.versionName}, ${hornetq.version.incrementingVersion})
       </HornetQ-Version>
-      <resteasy.version>3.0.4.Final</resteasy.version>
+      <resteasy.version>3.0.9.Final</resteasy.version>
+      <jackson-databind.version>2.3.1</jackson-databind.version>
       <skipUnitTests>true</skipUnitTests>
       <skipJmsTests>true</skipJmsTests>
       <skipBytemanTests>true</skipBytemanTests>
@@ -209,10 +211,9 @@
          </dependency>
          <dependency>
             <groupId>org.jboss.spec.javax.transaction</groupId>
-            <artifactId>jboss-transaction-api_1.1_spec</artifactId>
+            <artifactId>jboss-transaction-api_1.2_spec</artifactId>
             <version>1.0.0.Final</version>
          </dependency>
-
          <!--this specifically for the JMS Bridge-->
          <dependency>
             <groupId>org.jboss</groupId>
@@ -231,6 +232,12 @@
             <artifactId>jbossjts-jacorb</artifactId>
             <version>4.17.13.Final</version>
          </dependency>
+         <!-- this for Ironjacamar SPI XAResourceWrapper implementation -->
+         <dependency>
+            <groupId>org.jboss.ironjacamar</groupId>
+            <artifactId>ironjacamar-core-api</artifactId>
+            <version>1.2.0.Beta2</version>
+         </dependency>
          <!--needed to compile security-->
          <dependency>
             <groupId>org.jboss.security</groupId>
@@ -244,58 +251,15 @@
          </dependency>
          <!--needed to compile the bootstrap jar-->
          <dependency>
-            <groupId>org.jboss.microcontainer</groupId>
-            <artifactId>jboss-kernel</artifactId>
-            <version>2.0.6.GA</version>
-            <exclusions>
-               <exclusion>
-                  <groupId>org.jboss.logging</groupId>
-                  <artifactId>jboss-logging-spi</artifactId>
-               </exclusion>
-            </exclusions>
-         </dependency>
-         <dependency>
-            <groupId>org.jboss</groupId>
-            <artifactId>jboss-common-core</artifactId>
-            <version>2.2.14.GA</version>
-            <exclusions>
-               <exclusion>
-                  <groupId>org.jboss.logging</groupId>
-                  <artifactId>jboss-logging-spi</artifactId>
-               </exclusion>
-            </exclusions>
-         </dependency>
-         <dependency>
             <groupId>org.jgroups</groupId>
             <artifactId>jgroups</artifactId>
             <version>3.3.4.Final</version>
          </dependency>
-         <!--needed for microntainer deps in distro-->
-         <dependency>
-            <groupId>org.jboss.microcontainer</groupId>
-            <artifactId>jboss-dependency</artifactId>
-            <version>2.0.6.GA</version>
-         </dependency>
-         <dependency>
-            <groupId>org.jboss</groupId>
-            <artifactId>jboss-reflect</artifactId>
-            <version>2.0.2.GA</version>
-         </dependency>
-         <dependency>
-            <groupId>org.jboss</groupId>
-            <artifactId>jboss-mdr</artifactId>
-            <version>2.0.1.GA</version>
-         </dependency>
-         <dependency>
-            <groupId>org.jboss</groupId>
-            <artifactId>jbossxb</artifactId>
-            <version>2.0.1.GA</version>
-         </dependency>
-         <dependency>
-            <groupId>sun-jaxb</groupId>
-            <artifactId>jaxb-api</artifactId>
-            <version>2.1.9</version>
-         </dependency>
+        <dependency>
+            <groupId>io.airlift</groupId>
+            <artifactId>airline</artifactId>
+            <version>0.6</version>
+        </dependency>
          <!--needed to compile transport jar-->
          <dependency>
             <groupId>io.netty</groupId>
@@ -324,19 +288,25 @@
 
          <dependency>
             <groupId>org.apache.qpid</groupId>
-            <artifactId>proton-api</artifactId>
-            <version>0.5</version>
+            <artifactId>proton-j</artifactId>
+            <version>0.8</version>
          </dependency>
          <dependency>
             <groupId>org.apache.qpid</groupId>
-            <artifactId>proton-j-impl</artifactId>
-            <version>0.5</version>
+            <artifactId>proton-jms</artifactId>
+            <version>0.8</version>
          </dependency>
          <dependency>
-            <groupId>org.apache.qpid</groupId>
-            <artifactId>proton-jms</artifactId>
-            <version>0.5</version>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-client</artifactId>
+            <version>5.10.0</version>
          </dependency>
+         <dependency>
+           <groupId>org.slf4j</groupId>
+           <artifactId>slf4j-api</artifactId>
+           <version>1.7.5</version>
+         </dependency>
+
          <!--needed to compile the spring support-->
          <dependency>
             <groupId>org.springframework</groupId>
@@ -397,7 +367,6 @@
             <artifactId>jboss-jaspi-api</artifactId>
             <version>1.0.0.GA</version>
          </dependency>
-
          <!--needed to run the jms tests -->
          <dependency>
             <groupId>org.jboss.naming</groupId>
@@ -432,6 +401,12 @@
             <groupId>org.jboss.javaee</groupId>
             <artifactId>jboss-jca-api</artifactId>
             <version>1.5.0.GA</version>
+            <exclusions>
+               <exclusion>
+                  <groupId>org.jboss.logging</groupId>
+                  <artifactId>jboss-logging-spi</artifactId>
+               </exclusion>
+            </exclusions>
          </dependency>
 
          <!-- needed for javadoc graphics-->
@@ -457,6 +432,25 @@
          <name>JBoss releases</name>
          <url>https://repository.jboss.org/nexus/content/groups/public/</url>
       </repository>
+      <!--
+	This is a repository for intermediate releases from Proton.
+        In case there's an API broken, we will release an intermediate release here, on that case we will uncomment this and
+        use this repository again
+
+      <repository>
+         <snapshots>
+            <enabled>false</enabled>
+            <updatePolicy>never</updatePolicy>
+         </snapshots>
+         <releases>
+            <enabled>true</enabled>
+            <updatePolicy>interval:10080</updatePolicy>
+         </releases>
+         <id>fuse.release</id>
+         <name>Fuse releases</name>
+         <url>https://repository.jboss.org/nexus/content/repositories/fs-releases/</url>
+      </repository>
+      -->
    </repositories>
    <pluginRepositories>
       <pluginRepository>
@@ -482,6 +476,7 @@
             <activeByDefault>true</activeByDefault>
          </activation>
          <modules>
+            <module>hornetq-dto</module>
             <module>hornetq-bootstrap</module>
             <module>hornetq-commons</module>
             <module>hornetq-selector</module>
@@ -506,6 +501,7 @@
       <profile>
          <id>maven-release</id>
          <modules>
+            <module>hornetq-dto</module>
             <module>hornetq-bootstrap</module>
             <module>hornetq-commons</module>
             <module>hornetq-selector</module>
@@ -530,6 +526,7 @@
       <profile>
          <id>release</id>
          <modules>
+            <module>hornetq-dto</module>
             <module>hornetq-bootstrap</module>
             <module>hornetq-commons</module>
             <module>hornetq-selector</module>
@@ -556,6 +553,7 @@
       <profile>
          <id>hudson-tests</id>
          <modules>
+            <module>hornetq-dto</module>
             <module>hornetq-bootstrap</module>
             <module>hornetq-commons</module>
             <module>hornetq-selector</module>
@@ -593,6 +591,7 @@
       <profile>
          <id>jenkins-fast-tests</id>
          <modules>
+            <module>hornetq-dto</module>
             <module>hornetq-bootstrap</module>
             <module>hornetq-commons</module>
             <module>hornetq-selector</module>
@@ -626,6 +625,7 @@
       <profile>
          <id>examples</id>
          <modules>
+            <module>hornetq-dto</module>
             <module>hornetq-bootstrap</module>
             <module>hornetq-commons</module>
             <module>hornetq-selector</module>
@@ -847,6 +847,15 @@
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-checkstyle-plugin</artifactId>
             <version>2.12</version>
+            <dependencies>
+               <!-- This was initially done to enforce name on Parameter annotation
+                    I've developed a customized check and I needed this jar to deploy the specialized checkstyle -->
+               <dependency>
+                  <groupId>org.hornetq</groupId>
+                  <artifactId>hornetq-checkstyle-checks</artifactId>
+                  <version>0.2</version>
+               </dependency>
+            </dependencies>
             <configuration>
                <skip>${skipStyleCheck}</skip>
                <configLocation>${hornetq.basedir}/etc/checkstyle.xml</configLocation>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/byteman-tests/pom.xml b/tests/byteman-tests/pom.xml
index bac33d0..cd3bffa 100644
--- a/tests/byteman-tests/pom.xml
+++ b/tests/byteman-tests/pom.xml
@@ -13,7 +13,7 @@
 
    <properties>
       <tools.jar>${java.home}/../lib/tools.jar</tools.jar>
-      <byteman.version>2.1.4.1</byteman.version>
+      <byteman.version>2.2.0</byteman.version>
       <hornetq.basedir>${project.basedir}/../..</hornetq.basedir>
    </properties>
 
@@ -132,7 +132,7 @@
       </dependency>
       <dependency>
          <groupId>org.jboss.spec.javax.transaction</groupId>
-         <artifactId>jboss-transaction-api_1.1_spec</artifactId>
+         <artifactId>jboss-transaction-api_1.2_spec</artifactId>
       </dependency>
       <!--this specifically for the JMS Bridge -->
       <dependency>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BMFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BMFailoverTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BMFailoverTest.java
index 4182c8c..4724891 100644
--- a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BMFailoverTest.java
+++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BMFailoverTest.java
@@ -19,6 +19,7 @@ import javax.transaction.xa.Xid;
 
 import org.hornetq.api.core.HornetQTransactionOutcomeUnknownException;
 import org.hornetq.api.core.HornetQTransactionRolledBackException;
+import org.hornetq.api.core.HornetQUnBlockedException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientConsumer;
@@ -27,6 +28,7 @@ import org.hornetq.api.core.client.ClientProducer;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.HornetQClientMessageBundle;
 import org.hornetq.core.client.impl.ClientMessageImpl;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ClientSessionInternal;
@@ -35,11 +37,13 @@ import org.hornetq.core.server.Queue;
 import org.hornetq.core.transaction.impl.XidImpl;
 import org.hornetq.tests.integration.cluster.failover.FailoverTestBase;
 import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.utils.UUIDGenerator;
 import org.jboss.byteman.contrib.bmunit.BMRule;
 import org.jboss.byteman.contrib.bmunit.BMRules;
 import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -61,6 +65,7 @@ public class BMFailoverTest extends FailoverTestBase
    public void setUp() throws Exception
    {
       super.setUp();
+      stopped = false;
       locator = getServerLocator();
    }
 
@@ -71,6 +76,122 @@ public class BMFailoverTest extends FailoverTestBase
       super.tearDown();
    }
 
+   private static boolean stopped = false;
+   public static void stopAndThrow() throws HornetQUnBlockedException
+   {
+      if (!stopped)
+      {
+         try
+         {
+            serverToStop.getServer().stop(true);
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+         try
+         {
+            Thread.sleep(2000);
+         }
+         catch (InterruptedException e)
+         {
+            e.printStackTrace();
+         }
+         stopped = true;
+         throw HornetQClientMessageBundle.BUNDLE.unblockingACall(null);
+      }
+   }
+   @Test
+   @BMRules
+   (
+         rules =
+               {
+                     @BMRule
+                           (
+                                 name = "trace HornetQSessionContext xaEnd",
+                                 targetClass = "org.hornetq.core.protocol.core.impl.HornetQSessionContext",
+                                 targetMethod = "xaEnd",
+                                 targetLocation = "AT EXIT",
+                                 action = "org.hornetq.byteman.tests.BMFailoverTest.stopAndThrow()"
+                           )
+               }
+   )
+   //https://bugzilla.redhat.com/show_bug.cgi?id=1152410
+   public void testFailOnEndAndRetry() throws Exception
+   {
+      serverToStop = liveServer;
+
+      createSessionFactory();
+
+      ClientSession session = createSession(sf, true, false, false);
+
+      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      for (int i = 0; i < 100; i++)
+      {
+         producer.send(createMessage(session, i, true));
+      }
+
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+
+      Xid xid = RandomUtil.randomXid();
+
+      session.start(xid, XAResource.TMNOFLAGS);
+      session.start();
+      // Receive MSGs but don't ack!
+      for (int i = 0; i < 100; i++)
+      {
+         ClientMessage message = consumer.receive(1000);
+
+         Assert.assertNotNull(message);
+
+         assertMessageBody(i, message);
+
+         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
+      }
+      try
+      {
+         //top level prepare
+         session.end(xid, XAResource.TMSUCCESS);
+      }
+      catch (XAException e)
+      {
+         try
+         {
+            //top level abort
+            session.end(xid, XAResource.TMFAIL);
+         }
+         catch (XAException e1)
+         {
+            try
+            {
+               //rollback
+               session.rollback(xid);
+            }
+            catch (XAException e2)
+            {
+            }
+         }
+      }
+      xid = RandomUtil.randomXid();
+      session.start(xid, XAResource.TMNOFLAGS);
+
+      for (int i = 0; i < 50; i++)
+      {
+         ClientMessage message = consumer.receive(1000);
+
+         Assert.assertNotNull(message);
+
+         assertMessageBody(i, message);
+
+         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
+      }
+      session.end(xid, XAResource.TMSUCCESS);
+      session.commit(xid, true);
+   }
+
    @Test
    @BMRules
       (
@@ -170,7 +291,7 @@ public class BMFailoverTest extends FailoverTestBase
       //let's close the consumer so anything pending is handled
       consumer.close();
 
-      assertTrue("actual message count=" + inQ.getMessageCount(), inQ.getMessageCount() == 1);
+      assertEquals(1, getMessageCount(inQ));
    }
 
 
@@ -212,7 +333,7 @@ public class BMFailoverTest extends FailoverTestBase
       sendMessages(session, producer, 10);
       session.commit();
       Queue bindable = (Queue) backupServer.getServer().getPostOffice().getBinding(FailoverTestBase.ADDRESS).getBindable();
-      assertTrue(bindable.getMessageCount() == 10);
+      assertEquals(10, getMessageCount(bindable));
    }
 
    @Test
@@ -266,7 +387,8 @@ public class BMFailoverTest extends FailoverTestBase
          //pass
       }
       Queue bindable = (Queue) backupServer.getServer().getPostOffice().getBinding(FailoverTestBase.ADDRESS).getBindable();
-      assertTrue("messager count = " + bindable.getMessageCount(), bindable.getMessageCount() == 10);
+      assertEquals(10, getMessageCount(bindable));
+
    }
 
    @Override
@@ -303,6 +425,13 @@ public class BMFailoverTest extends FailoverTestBase
       return addClientSession(sf1.createSession(autoCommitSends, autoCommitAcks));
    }
 
+   protected ClientSession
+   createSession(ClientSessionFactory sf1, boolean xa, boolean autoCommitSends,   boolean autoCommitAcks) throws Exception
+   {
+      return addClientSession(sf1.createSession(xa, autoCommitSends, autoCommitAcks));
+   }
+
+
    private void createSessionFactory() throws Exception
    {
       locator.setBlockOnNonDurableSend(true);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BridgeServerLocatorConfigurationTest.java
----------------------------------------------------------------------
diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BridgeServerLocatorConfigurationTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BridgeServerLocatorConfigurationTest.java
index 7b28da0..c30b7b9 100644
--- a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BridgeServerLocatorConfigurationTest.java
+++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/BridgeServerLocatorConfigurationTest.java
@@ -17,9 +17,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.hornetq.api.config.HornetQDefaultConfiguration;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.config.CoreQueueConfiguration;
@@ -98,24 +96,31 @@ public class BridgeServerLocatorConfigurationTest extends ServiceTestBase
          ArrayList<String> staticConnectors = new ArrayList<String>();
          staticConnectors.add(server1tc.getName());
 
-         BridgeConfiguration bridgeConfiguration =
-                  new BridgeConfiguration(BRIDGE_NAME, queueName0, forwardAddress, null, null,
-                                          HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
-                                          HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, BRIDGE_TTL, 1000,
-                                          HornetQClient.DEFAULT_MAX_RETRY_INTERVAL, 1d, -1, 0, 0, true, 1024,
-                                          staticConnectors, false, HornetQDefaultConfiguration.getDefaultClusterUser(),
-                                          HornetQDefaultConfiguration.getDefaultClusterPassword());
+         BridgeConfiguration bridgeConfiguration = new BridgeConfiguration()
+            .setName(BRIDGE_NAME)
+            .setQueueName(queueName0)
+            .setForwardingAddress(forwardAddress)
+            .setConnectionTTL(BRIDGE_TTL)
+            .setRetryInterval(1000)
+            .setReconnectAttempts(0)
+            .setReconnectAttemptsOnSameNode(0)
+            .setConfirmationWindowSize(1024)
+            .setStaticConnectors(staticConnectors);
 
          List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
          bridgeConfigs.add(bridgeConfiguration);
          serverWithBridge.getConfiguration().setBridgeConfigurations(bridgeConfigs);
 
-         CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+         CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration()
+            .setAddress(testAddress)
+            .setName(queueName0);
          List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
          queueConfigs0.add(queueConfig0);
          serverWithBridge.getConfiguration().setQueueConfigurations(queueConfigs0);
 
-         CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
+         CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration()
+            .setAddress(forwardAddress)
+            .setName(queueName1);
          List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
          queueConfigs1.add(queueConfig1);
          server1.getConfiguration().setQueueConfigurations(queueConfigs1);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ClusteredGroupingTest.java
----------------------------------------------------------------------
diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ClusteredGroupingTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ClusteredGroupingTest.java
index 19852f9..cd27a49 100644
--- a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ClusteredGroupingTest.java
+++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ClusteredGroupingTest.java
@@ -19,7 +19,7 @@ import org.hornetq.api.core.HornetQNonExistentQueueException;
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.management.ManagementHelper;
-import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
 import org.hornetq.core.server.group.impl.Response;
@@ -419,7 +419,7 @@ public class ClusteredGroupingTest extends ClusterTestBase
 
    public static void pause2(Notification notification)
    {
-      if (notification.getType() == NotificationType.BINDING_REMOVED)
+      if (notification.getType() == CoreNotificationType.BINDING_REMOVED)
       {
          SimpleString clusterName = notification.getProperties()
             .getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/HornetQMessageHandlerTest.java
----------------------------------------------------------------------
diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/HornetQMessageHandlerTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/HornetQMessageHandlerTest.java
index 178643f..e800d79 100644
--- a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/HornetQMessageHandlerTest.java
+++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/HornetQMessageHandlerTest.java
@@ -96,6 +96,7 @@ public class HornetQMessageHandlerTest extends HornetQRATestBase
 
       HornetQActivationSpec spec = new HornetQActivationSpec();
       spec.setMaxSession(1);
+      spec.setCallTimeout(1000L);
       spec.setResourceAdapter(qResourceAdapter);
       spec.setUseJNDI(false);
       spec.setDestinationType("javax.jms.Queue");
@@ -131,7 +132,7 @@ public class HornetQMessageHandlerTest extends HornetQRATestBase
       qResourceAdapter.stop();
 
       Binding binding = server.getPostOffice().getBinding(SimpleString.toSimpleString(MDBQUEUEPREFIXED));
-      assertEquals(1, ((Queue) binding.getBindable()).getMessageCount());
+      assertEquals(1, getMessageCount(((Queue) binding.getBindable())));
 
       server.stop();
       server.start();
@@ -175,6 +176,7 @@ public class HornetQMessageHandlerTest extends HornetQRATestBase
 
       HornetQActivationSpec spec = new HornetQActivationSpec();
       spec.setMaxSession(1);
+      spec.setCallTimeout(1000L);
       spec.setResourceAdapter(qResourceAdapter);
       spec.setUseJNDI(false);
       spec.setDestinationType("javax.jms.Queue");
@@ -210,7 +212,8 @@ public class HornetQMessageHandlerTest extends HornetQRATestBase
       qResourceAdapter.stop();
 
       Binding binding = server.getPostOffice().getBinding(SimpleString.toSimpleString(MDBQUEUEPREFIXED));
-      assertEquals(1, ((Queue) binding.getBindable()).getMessageCount());
+      assertEquals(1, getMessageCount(((Queue) binding.getBindable())));
+
 
       server.stop();
       server.start();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/JMSBridgeReconnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/JMSBridgeReconnectionTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/JMSBridgeReconnectionTest.java
new file mode 100644
index 0000000..e0db2e3
--- /dev/null
+++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/JMSBridgeReconnectionTest.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.byteman.tests;
+
+import org.hornetq.core.client.impl.ClientProducerCredits;
+import org.hornetq.core.message.impl.MessageInternal;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.jms.bridge.ConnectionFactoryFactory;
+import org.hornetq.jms.bridge.QualityOfServiceMode;
+import org.hornetq.jms.bridge.impl.JMSBridgeImpl;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.tests.integration.jms.bridge.BridgeTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(BMUnitRunner.class)
+public class JMSBridgeReconnectionTest extends BridgeTestBase
+{
+
+   @Test
+   @BMRules
+         (
+               rules =
+                     {
+                           @BMRule
+                                 (
+                                       name = "trace clientsessionimpl send",
+                                       targetClass = "org.hornetq.core.protocol.core.impl.ChannelImpl",
+                                       targetMethod = "send",
+                                       targetLocation = "ENTRY",
+                                       action = "org.hornetq.byteman.tests.JMSBridgeReconnectionTest.pause($1);"
+                                 ),
+                           @BMRule
+                                 (
+                                       name = "trace sendRegularMessage",
+                                       targetClass = "org.hornetq.core.client.impl.ClientProducerImpl",
+                                       targetMethod = "sendRegularMessage",
+                                       targetLocation = "ENTRY",
+                                       action = "org.hornetq.byteman.tests.JMSBridgeReconnectionTest.pause2($1,$2,$3);"
+                                 )
+                     }
+         )
+   public void performCrashDestinationStopBridge() throws Exception
+   {
+      hornetQServer = jmsServer1;
+      ConnectionFactoryFactory factInUse0 = cff0;
+      ConnectionFactoryFactory factInUse1 = cff1;
+      final JMSBridgeImpl bridge =
+            new JMSBridgeImpl(factInUse0,
+                  factInUse1,
+                  sourceQueueFactory,
+                  targetQueueFactory,
+                  null,
+                  null,
+                  null,
+                  null,
+                  null,
+                  1000,
+                  -1,
+                  QualityOfServiceMode.DUPLICATES_OK,
+                  10,
+                  -1,
+                  null,
+                  null,
+                  false);
+
+      addHornetQComponent(bridge);
+      bridge.setTransactionManager(newTransactionManager());
+      bridge.start();
+      final CountDownLatch latch = new CountDownLatch(20);
+      Thread clientThread = new Thread(new Runnable()
+      {
+         @Override
+         public void run()
+         {
+            while (bridge.isStarted())
+            {
+               try
+               {
+                  sendMessages(cf0, sourceQueue, 0, 1, false, false);
+                  latch.countDown();
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace();
+               }
+            }
+         }
+      });
+
+      clientThread.start();
+
+      stopLatch.await(10000, TimeUnit.MILLISECONDS);
+
+      bridge.stop();
+
+      clientThread.join(5000);
+
+      assertTrue(!clientThread.isAlive());
+   }
+
+   public static void pause(Packet packet)
+   {
+      if (packet.getType() == PacketImpl.SESS_SEND)
+      {
+         SessionSendMessage sendMessage = (SessionSendMessage) packet;
+         if (sendMessage.getMessage().containsProperty("__HQ_CID") && count < 0 && !stopped)
+         {
+            try
+            {
+               hornetQServer.stop();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+            stopped = true;
+            try
+            {
+               Thread.sleep(5000);
+            }
+            catch (InterruptedException e)
+            {
+               e.printStackTrace();
+            }
+            stopLatch.countDown();
+         }
+      }
+   }
+
+   static JMSServerManager hornetQServer;
+   static boolean stopped = false;
+   static int count = 20;
+   static CountDownLatch stopLatch = new CountDownLatch(1);
+   public static void pause2(MessageInternal msgI, boolean sendBlocking, final ClientProducerCredits theCredits)
+   {
+      if (msgI.containsProperty("__HQ_CID"))
+      {
+         count--;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/OrphanedConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/OrphanedConsumerTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/OrphanedConsumerTest.java
new file mode 100644
index 0000000..6a278fe
--- /dev/null
+++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/OrphanedConsumerTest.java
@@ -0,0 +1,298 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.byteman.tests;
+
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Clebert Suconic
+ */
+@RunWith(BMUnitRunner.class)
+public class OrphanedConsumerTest extends ServiceTestBase
+{
+
+   private static boolean conditionActive = true;
+
+   public static final boolean isConditionActive()
+   {
+      return conditionActive;
+   }
+
+
+   public static final void setConditionActive(boolean _conditionActive)
+   {
+      conditionActive = _conditionActive;
+   }
+
+
+   public static void throwException() throws Exception
+   {
+      throw new InterruptedException("nice.. I interrupted this!");
+   }
+
+   private HornetQServer server;
+
+   private ServerLocator locator;
+
+   static HornetQServer staticServer;
+
+   /**
+    * {@link #leavingCloseOnTestCountersWhileClosing()} will set this in case of any issues.
+    * the test must then validate for this being null
+    */
+   static AssertionError verification;
+
+   /**
+    * This static method is an entry point for the byteman rules on {@link #testOrphanedConsumers()}
+    * */
+   public static void leavingCloseOnTestCountersWhileClosing()
+   {
+      if (staticServer.getConnectionCount() == 0)
+      {
+         verification = new AssertionError("The connection was closed before the consumers and sessions, this may cause issues on management leaving Orphaned Consumers!");
+      }
+
+      if (staticServer.getSessions().size() == 0)
+      {
+         verification = new AssertionError("The session was closed before the consumers, this may cause issues on management leaving Orphaned Consumers!");
+      }
+   }
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      super.setUp();
+      setConditionActive(true);
+      /** I'm using the internal method here because closing
+       *  this locator on tear down would hang.
+       *  as we are tweaking with the internal state and making it fail */
+      locator = internalCreateNonHALocator(true);
+   }
+
+
+   @Override
+   @After
+   public void tearDown() throws Exception
+   {
+      super.tearDown();
+      setConditionActive(false);
+
+      staticServer = null;
+   }
+
+
+   /**
+    * This is like being two tests in one:
+    * I - validating that any exception during the close wouldn't stop connection from being closed
+    * II - validating that the connection is only removed at the end of the process and you wouldn't see
+    *      inconsistencies on management
+    * @throws Exception
+    */
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "closeExit",
+                     targetClass = "org.hornetq.core.server.impl.ServerConsumerImpl",
+                     targetMethod = "close",
+                     targetLocation = "AT EXIT",
+                     condition = "org.hornetq.byteman.tests.OrphanedConsumerTest.isConditionActive()",
+                     action = "System.out.println(\"throwing stuff\");throw new InterruptedException()"
+                  ),
+               @BMRule
+                  (
+                     name = "closeEnter",
+                     targetClass = "org.hornetq.core.server.impl.ServerConsumerImpl",
+                     targetMethod = "close",
+                     targetLocation = "ENTRY",
+                     condition = "org.hornetq.byteman.tests.OrphanedConsumerTest.isConditionActive()",
+                     action = "org.hornetq.byteman.tests.OrphanedConsumerTest.leavingCloseOnTestCountersWhileClosing()"
+                  )
+
+            }
+      )
+   public void testOrphanedConsumers() throws Exception
+   {
+      internalTestOrphanedConsumers(false);
+   }
+
+
+   /**
+    * This is like being two tests in one:
+    * I - validating that any exception during the close wouldn't stop connection from being closed
+    * II - validating that the connection is only removed at the end of the process and you wouldn't see
+    *      inconsistencies on management
+    * @throws Exception
+    */
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "closeExit",
+                     targetClass = "org.hornetq.core.server.impl.ServerConsumerImpl",
+                     targetMethod = "close",
+                     targetLocation = "AT EXIT",
+                     condition = "org.hornetq.byteman.tests.OrphanedConsumerTest.isConditionActive()",
+                     action = "System.out.println(\"throwing stuff\");throw new InterruptedException()"
+                  ),
+               @BMRule
+                  (
+                     name = "closeEnter",
+                     targetClass = "org.hornetq.core.server.impl.ServerConsumerImpl",
+                     targetMethod = "close",
+                     targetLocation = "ENTRY",
+                     condition = "org.hornetq.byteman.tests.OrphanedConsumerTest.isConditionActive()",
+                     action = "org.hornetq.byteman.tests.OrphanedConsumerTest.leavingCloseOnTestCountersWhileClosing()"
+                  )
+
+            }
+      )
+   public void testOrphanedConsumersByManagement() throws Exception
+   {
+      internalTestOrphanedConsumers(true);
+   }
+
+   /**
+    *
+    * @param useManagement true = it will use a management operation to make the connection failure, false through ping
+    * @throws Exception
+    */
+   private void internalTestOrphanedConsumers(boolean useManagement) throws Exception
+   {
+      final int NUMBER_OF_MESSAGES = 2;
+      server = createServer(true, true);
+      server.start();
+      staticServer = server;
+
+      locator.setBlockOnNonDurableSend(false);
+      locator.setBlockOnDurableSend(false);
+      locator.setBlockOnAcknowledge(true);
+      locator.setConnectionTTL(1000);
+      locator.setClientFailureCheckPeriod(100);
+      locator.setReconnectAttempts(0);
+      // We are not interested on consumer-window-size on this test
+      // We want that every message is delivered
+      // as we asserting for number of consumers available and round-robin on delivery
+      locator.setConsumerWindowSize(-1);
+
+      ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl)createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(true, true, 0);
+
+      session.createQueue("queue", "queue1", true);
+      session.createQueue("queue", "queue2", true);
+
+      ClientProducer prod = session.createProducer("queue");
+
+      ClientConsumer consumer = session.createConsumer("queue1");
+      ClientConsumer consumer2 = session.createConsumer("queue2");
+
+
+      Queue queue1 = server.locateQueue(new SimpleString("queue1"));
+
+      Queue queue2 = server.locateQueue(new SimpleString("queue2"));
+
+      session.start();
+
+
+      if (!useManagement)
+      {
+         sf.stopPingingAfterOne();
+
+         for (long timeout = System.currentTimeMillis() + 6000; timeout > System.currentTimeMillis() && server.getConnectionCount() != 0; )
+         {
+            Thread.sleep(100);
+         }
+
+         // an extra second to avoid races of something closing the session while we are asserting it
+         Thread.sleep(1000);
+      }
+      else
+      {
+         server.getHornetQServerControl().closeConnectionsForAddress("127.0.0.1");
+      }
+
+      if (verification != null)
+      {
+         throw verification;
+      }
+
+      assertEquals(0, queue1.getConsumerCount());
+      assertEquals(0, queue2.getConsumerCount());
+
+      setConditionActive(false);
+
+      locator = internalCreateNonHALocator(true);
+
+      locator.setBlockOnNonDurableSend(false);
+      locator.setBlockOnDurableSend(false);
+      locator.setBlockOnAcknowledge(true);
+      locator.setReconnectAttempts(0);
+      locator.setConsumerWindowSize(-1);
+
+      sf = (ClientSessionFactoryImpl)locator.createSessionFactory();
+      session = sf.createSession(true, true, 0);
+
+
+      session.start();
+
+
+      prod = session.createProducer("queue");
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+      {
+         ClientMessage message = session.createMessage(true);
+         message.putIntProperty("i", i);
+         prod.send(message);
+      }
+
+      consumer = session.createConsumer("queue1");
+      consumer2 = session.createConsumer("queue2");
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+      {
+         assertNotNull(consumer.receive(5000));
+         assertNotNull(consumer2.receive(5000));
+      }
+
+      session.close();
+   }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/PagingLeakTest.java
----------------------------------------------------------------------
diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/PagingLeakTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/PagingLeakTest.java
new file mode 100644
index 0000000..654de4c
--- /dev/null
+++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/PagingLeakTest.java
@@ -0,0 +1,270 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.byteman.tests;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class PagingLeakTest extends ServiceTestBase
+{
+
+   private static final AtomicInteger pagePosInstances = new AtomicInteger(0);
+
+   public static void newPosition()
+   {
+      pagePosInstances.incrementAndGet();
+   }
+
+   public static void deletePosition()
+   {
+      pagePosInstances.decrementAndGet();
+   }
+
+   @Before
+   public void setup()
+   {
+      pagePosInstances.set(0);
+   }
+
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "newPosition",
+                     targetClass = "org.hornetq.core.paging.cursor.impl.PagePositionImpl",
+                     targetMethod = "<init>()",
+                     targetLocation = "ENTRY",
+                     action = "org.hornetq.byteman.tests.PagingLeakTest.newPosition()"
+                  ),
+               @BMRule
+                  (
+                     name = "finalPosition",
+                     targetClass = "org.hornetq.core.paging.cursor.impl.PagePositionImpl",
+                     targetMethod = "finalize",
+                     targetLocation = "ENTRY",
+                     action = "org.hornetq.byteman.tests.PagingLeakTest.deletePosition()"
+                  )
+            }
+      )
+   public void testValidateLeak() throws Throwable
+   {
+
+      List<PagePositionImpl> positions = new ArrayList<PagePositionImpl>();
+
+      for (int i = 0; i < 300; i++)
+      {
+         positions.add(new PagePositionImpl(3, 3));
+      }
+
+      long timeout = System.currentTimeMillis() + 5000;
+      while (pagePosInstances.get() != 300 && timeout > System.currentTimeMillis())
+      {
+         forceGC();
+      }
+
+      // This is just to validate the rules are correctly applied on byteman
+      assertEquals("You have changed something on PagePositionImpl in such way that these byteman rules are no longer working", 300, pagePosInstances.get());
+
+      positions.clear();
+
+      timeout = System.currentTimeMillis() + 5000;
+      while (pagePosInstances.get() != 0 && timeout > System.currentTimeMillis())
+      {
+         forceGC();
+      }
+
+      // This is just to validate the rules are correctly applied on byteman
+      assertEquals("You have changed something on PagePositionImpl in such way that these byteman rules are no longer working", 0, pagePosInstances.get());
+
+      final ArrayList<Exception> errors = new ArrayList<Exception>();
+      // A backup that will be waiting to be activated
+      Configuration conf = createDefaultConfig(true)
+         .setSecurityEnabled(false)
+         .addConnectorConfiguration("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+      final HornetQServer server = HornetQServers.newHornetQServer(conf, true);
+      addServer(server);
+
+
+      server.start();
+
+
+      AddressSettings settings = new AddressSettings();
+      settings.setPageSizeBytes(20 * 1024);
+      settings.setMaxSizeBytes(200 * 1024);
+      settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+
+
+      server.getAddressSettingsRepository().addMatch("#", settings);
+
+
+      final SimpleString address = new SimpleString("pgdAddress");
+
+      class Consumer extends Thread
+      {
+         final ServerLocator locator;
+         final ClientSessionFactory sf;
+         final ClientSession session;
+         final ClientConsumer consumer;
+
+         final int sleepTime;
+         final int maxConsumed;
+
+         Consumer(int sleepTime, String suffix, int maxConsumed) throws Exception
+         {
+
+            server.createQueue(address, address.concat(suffix), null, true, false);
+
+            this.sleepTime = sleepTime;
+            locator = createInVMLocator(0);
+            sf = locator.createSessionFactory();
+            session = sf.createSession(true, true);
+            consumer = session.createConsumer(address.concat(suffix));
+
+            this.maxConsumed = maxConsumed;
+         }
+
+         public void run()
+         {
+            try
+            {
+               session.start();
+
+               long lastTime = System.currentTimeMillis();
+
+               for (long i = 0; i < maxConsumed; i++)
+               {
+                  ClientMessage msg = consumer.receive(5000);
+
+                  if (msg == null)
+                  {
+                     errors.add(new Exception("didn't receive a message"));
+                     return;
+                  }
+
+                  msg.acknowledge();
+
+
+                  if (sleepTime > 0)
+                  {
+
+                     Thread.sleep(sleepTime);
+                  }
+
+                  if (i % 1000 == 0)
+                  {
+                     System.out.println("Consumed " + i + " events in " + (System.currentTimeMillis() - lastTime));
+                     lastTime = System.currentTimeMillis();
+                  }
+               }
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+
+
+      int numberOfMessages = 10000;
+
+      Consumer consumer1 = new Consumer(100, "-1", 150);
+      Consumer consumer2 = new Consumer(0, "-2", numberOfMessages);
+
+      final ServerLocator locator = createInVMLocator(0);
+      final ClientSessionFactory sf = locator.createSessionFactory();
+      final ClientSession session = sf.createSession(true, true);
+      final ClientProducer producer = session.createProducer(address);
+
+
+      byte[] b = new byte[1024];
+
+
+      for (long i = 0; i < numberOfMessages; i++)
+      {
+         ClientMessage msg = session.createMessage(true);
+         msg.getBodyBuffer().writeBytes(b);
+         producer.send(msg);
+
+         if (i == 1000)
+         {
+            System.out.println("Starting consumers!!!");
+            consumer1.start();
+            consumer2.start();
+         }
+
+         if (i % 1000 == 0)
+         {
+            validateInstances();
+         }
+
+      }
+
+
+      consumer1.join();
+      consumer2.join();
+
+      validateInstances();
+      Throwable elast = null;
+
+      for (Throwable e : errors)
+      {
+         e.printStackTrace();
+         elast = e;
+      }
+
+      if (elast != null)
+      {
+         throw elast;
+      }
+
+   }
+
+   private void validateInstances()
+   {
+      forceGC();
+      int count2 = pagePosInstances.get();
+      Assert.assertTrue("There is a leak, you shouldn't have this many instances (" + count2 + ")", count2 < 5000);
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailoverTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailoverTest.java
index 22a39da..05c7245 100644
--- a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailoverTest.java
+++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailoverTest.java
@@ -14,6 +14,8 @@ package org.hornetq.byteman.tests;
 
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.core.config.ScaleDownConfiguration;
+import org.hornetq.core.config.ha.LiveOnlyPolicyConfiguration;
 import org.hornetq.core.remoting.impl.netty.TransportConstants;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
@@ -37,23 +39,30 @@ public class ScaleDownFailoverTest extends ClusterTestBase
    {
       super.setUp();
       stopCount = 0;
-      setupServer(0, isFileStorage(), isNetty());
-      setupServer(1, isFileStorage(), isNetty());
-      setupServer(2, isFileStorage(), isNetty());
+      setupLiveServer(0, isFileStorage(), false, isNetty(), true);
+      setupLiveServer(1, isFileStorage(), false, isNetty(), true);
+      setupLiveServer(2, isFileStorage(), false, isNetty(), true);
+      ScaleDownConfiguration scaleDownConfiguration = new ScaleDownConfiguration();
+      ScaleDownConfiguration scaleDownConfiguration2 = new ScaleDownConfiguration();
+      scaleDownConfiguration2.setEnabled(false);
+      ScaleDownConfiguration scaleDownConfiguration3 = new ScaleDownConfiguration();
+      scaleDownConfiguration3.setEnabled(false);
+      ((LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration);
+      ((LiveOnlyPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration2);
+      ((LiveOnlyPolicyConfiguration) servers[2].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration3);
       if (isGrouped())
       {
-         servers[0].getConfiguration().getHAPolicy().setScaleDownGroupName("bill");
-         servers[1].getConfiguration().getHAPolicy().setScaleDownGroupName("bill");
-         servers[2].getConfiguration().getHAPolicy().setScaleDownGroupName("bill");
+         scaleDownConfiguration.setGroupName("bill");
+         scaleDownConfiguration2.setGroupName("bill");
+         scaleDownConfiguration3.setGroupName("bill");
       }
-      servers[0].getConfiguration().getHAPolicy().setScaleDown(true);
       staticServers = servers;
       setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
       setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
-      servers[0].getConfiguration().getHAPolicy().getScaleDownConnectors().addAll(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
-      servers[1].getConfiguration().getHAPolicy().getScaleDownConnectors().addAll(servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
-      servers[2].getConfiguration().getHAPolicy().getScaleDownConnectors().addAll(servers[2].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
+      scaleDownConfiguration.getConnectors().addAll(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
+      scaleDownConfiguration2.getConnectors().addAll(servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
+      scaleDownConfiguration3.getConnectors().addAll(servers[2].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
 
       startServers(0, 1, 2);
       setupSessionFactory(0, isNetty());

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailureTest.java
----------------------------------------------------------------------
diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailureTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailureTest.java
index 1a7b254..44cd4df 100644
--- a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailureTest.java
+++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailureTest.java
@@ -13,6 +13,8 @@
 package org.hornetq.byteman.tests;
 
 import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.core.config.ScaleDownConfiguration;
+import org.hornetq.core.config.ha.LiveOnlyPolicyConfiguration;
 import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
 import org.jboss.byteman.contrib.bmunit.BMRule;
 import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
@@ -30,14 +32,15 @@ public class ScaleDownFailureTest extends ClusterTestBase
    public void setUp() throws Exception
    {
       super.setUp();
-      setupServer(0, isFileStorage(), isNetty());
-      setupServer(1, isFileStorage(), isNetty());
+      setupLiveServer(0, isFileStorage(), false, isNetty(), true);
+      setupLiveServer(1, isFileStorage(), false, isNetty(), true);
       if (isGrouped())
       {
-         servers[0].getConfiguration().getHAPolicy().setScaleDownGroupName("bill");
-         servers[1].getConfiguration().getHAPolicy().setScaleDownGroupName("bill");
+         ScaleDownConfiguration scaleDownConfiguration = new ScaleDownConfiguration();
+         scaleDownConfiguration.setGroupName("bill");
+         ((LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration);
+         ((LiveOnlyPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration);
       }
-      servers[0].getConfiguration().getHAPolicy().setScaleDown(true);
       setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
       setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
       startServers(0, 1);
@@ -62,7 +65,7 @@ public class ScaleDownFailureTest extends ClusterTestBase
       closeAllConsumers();
       closeAllSessionFactories();
       closeAllServerLocatorsFactories();
-      servers[0].getConfiguration().getHAPolicy().setScaleDown(false);
+      ((LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(null);
       stopServers(0, 1);
       super.tearDown();
    }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StartStopDeadlockTest.java
----------------------------------------------------------------------
diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StartStopDeadlockTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StartStopDeadlockTest.java
index 28bd89c..a813bfb 100644
--- a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StartStopDeadlockTest.java
+++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StartStopDeadlockTest.java
@@ -17,9 +17,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration;
+import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServers;
-import org.hornetq.core.server.cluster.ha.HAPolicy;
 import org.hornetq.jms.server.impl.JMSServerManagerImpl;
 import org.hornetq.tests.unit.util.InVMNamingContext;
 import org.hornetq.tests.util.ServiceTestBase;
@@ -76,20 +77,20 @@ public class StartStopDeadlockTest extends ServiceTestBase
    {
 
       // A live server that will always be crashed
-      Configuration confLive = createDefaultConfig(true);
-      confLive.setSecurityEnabled(false);
-      confLive.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE);
-      confLive.getConnectorConfigurations().put("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      Configuration confLive = createDefaultConfig(true)
+         .setSecurityEnabled(false)
+         .setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration())
+         .addConnectorConfiguration("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
       final HornetQServer serverLive = HornetQServers.newHornetQServer(confLive);
       serverLive.start();
       addServer(serverLive);
 
 
       // A backup that will be waiting to be activated
-      Configuration conf = createDefaultConfig(true);
-      conf.setSecurityEnabled(false);
-      conf.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE);
-      conf.getConnectorConfigurations().put("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      Configuration conf = createDefaultConfig(true)
+         .setSecurityEnabled(false)
+         .setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration())
+         .addConnectorConfiguration("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
 
       final HornetQServer server = HornetQServers.newHornetQServer(conf, true);
       addServer(server);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StompInternalStateTest.java
----------------------------------------------------------------------
diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StompInternalStateTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StompInternalStateTest.java
index b446f13..7bde640 100644
--- a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StompInternalStateTest.java
+++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/StompInternalStateTest.java
@@ -20,7 +20,7 @@ import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
 import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -88,12 +88,12 @@ public class StompInternalStateTest extends ServiceTestBase
    @Override
    protected Configuration createDefaultConfig(final boolean netty) throws Exception
    {
-      Configuration config = super.createDefaultConfig(netty);
-      config.setSecurityEnabled(false);
-      config.setPersistenceEnabled(false);
+      Configuration config = super.createDefaultConfig(netty)
+         .setSecurityEnabled(false)
+         .setPersistenceEnabled(false);
 
       Map<String, Object> params = new HashMap<String, Object>();
-      params.put(TransportConstants.PROTOCOL_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME);
+      params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME);
       params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
       params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
       TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
@@ -107,14 +107,14 @@ public class StompInternalStateTest extends ServiceTestBase
    public void verifyBindingAddRemove(Notification noti, Object obj)
    {
       Set<String> destinations = (Set<String>)obj;
-      if (noti.getType() == NotificationType.BINDING_ADDED)
+      if (noti.getType() == CoreNotificationType.BINDING_ADDED)
       {
          if (!destinations.contains(STOMP_QUEUE_NAME))
          {
             resultTestStompProtocolManagerLeak += "didn't save the queue when binding added " + destinations;
          }
       }
-      else if (noti.getType() == NotificationType.BINDING_REMOVED)
+      else if (noti.getType() == CoreNotificationType.BINDING_REMOVED)
       {
          if (destinations.contains(STOMP_QUEUE_NAME))
          {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/config/logging.properties.trace
----------------------------------------------------------------------
diff --git a/tests/config/logging.properties.trace b/tests/config/logging.properties.trace
new file mode 100644
index 0000000..5bafa56
--- /dev/null
+++ b/tests/config/logging.properties.trace
@@ -0,0 +1,68 @@
+#
+# JBoss, Home of Professional Open Source.
+# Copyright 2010, Red Hat, Inc., and individual contributors
+# as indicated by the @author tags. See the copyright.txt file in the
+# distribution for a full listing of individual contributors.
+#
+# This is free software; you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation; either version 2.1 of
+# the License, or (at your option) any later version.
+#
+# This software is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this software; if not, write to the Free
+# Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+# 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+#
+
+# this is an example of a logging configuration where you could enable tracing at the testsuite
+
+# Additional logger names to configure (root logger is always configured)
+# Root logger option
+loggers=org.jboss.logging,org.hornetq.core.server,org.hornetq.utils,org.hornetq.journal,org.hornetq.jms,org.hornetq.ra,org.hornetq.tests.unit,org.hornetq.tests.integration,org.hornetq.jms.tests
+
+# Root logger level
+logger.level=INFO
+# HornetQ logger levels
+logger.org.hornetq.core.server.level=TRACE
+logger.org.hornetq.journal.level=INFO
+logger.org.hornetq.utils.level=INFO
+logger.org.hornetq.jms.level=INFO
+logger.org.hornetq.ra.level=INFO
+logger.org.hornetq.tests.unit.level=INFO
+logger.org.hornetq.tests.integration.level=INFO
+logger.org.hornetq.jms.tests.level=INFO
+
+# Root logger handlers
+logger.handlers=CONSOLE,TEST
+#logger.handlers=CONSOLE,FILE
+
+# Console handler configuration
+handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler
+handler.CONSOLE.properties=autoFlush
+handler.CONSOLE.level=FINE
+handler.CONSOLE.autoFlush=true
+handler.CONSOLE.formatter=PATTERN
+
+# File handler configuration
+handler.FILE=org.jboss.logmanager.handlers.FileHandler
+handler.FILE.level=FINE
+handler.FILE.properties=autoFlush,fileName
+handler.FILE.autoFlush=true
+handler.FILE.fileName=target/hornetq.log
+handler.FILE.formatter=PATTERN
+
+# Console handler configuration
+handler.TEST=org.hornetq.tests.logging.AssertionLoggerHandler
+handler.TEST.level=TRACE
+handler.TEST.formatter=PATTERN
+
+# Formatter pattern configuration
+formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
+formatter.PATTERN.properties=pattern
+formatter.PATTERN.pattern=%d{HH:mm:ss,SSS} %-5p [%c] %s%E%n

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 05fc719..f25fddc 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -13,8 +13,8 @@
 
    <properties>
       <hornetq.basedir>${project.basedir}/../..</hornetq.basedir>
-      <vertx.version>2.1RC1</vertx.version>
-      <vertx.testtools.version>2.0.2-final</vertx.testtools.version>
+      <vertx.version>2.1.2</vertx.version>
+      <vertx.testtools.version>2.0.3-final</vertx.testtools.version>
    </properties>
 
    <dependencies>
@@ -66,6 +66,11 @@
       </dependency>
       <dependency>
          <groupId>org.hornetq</groupId>
+         <artifactId>hornetq-tools</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.hornetq</groupId>
          <artifactId>hornetq-twitter-integration</artifactId>
          <version>${project.version}</version>
       </dependency>
@@ -96,12 +101,12 @@
       </dependency>
       <dependency>
          <groupId>org.hornetq</groupId>
-         <artifactId>hornetq-aerogear-integration</artifactId>
+         <artifactId>hornetq-openwire-protocol</artifactId>
          <version>${project.version}</version>
       </dependency>
       <dependency>
          <groupId>org.hornetq</groupId>
-         <artifactId>hornetq-tools</artifactId>
+         <artifactId>hornetq-aerogear-integration</artifactId>
          <version>${project.version}</version>
       </dependency>
       <dependency>
@@ -121,11 +126,6 @@
          <artifactId>twitter4j-core</artifactId>
       </dependency>
       <dependency>
-         <groupId>com.hazelcast</groupId>
-         <artifactId>hazelcast</artifactId>
-         <version>2.6.6</version>
-      </dependency>
-      <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
       </dependency>
@@ -147,9 +147,13 @@
          <version>0.24</version>
       </dependency>
       <dependency>
-         <groupId>org.apache.qpid</groupId>
-         <artifactId>proton-jms</artifactId>
-      </dependency>
+          <groupId>org.apache.qpid</groupId>
+          <artifactId>proton-j</artifactId>
+       </dependency>
+       <dependency>
+          <groupId>org.apache.qpid</groupId>
+          <artifactId>proton-jms</artifactId>
+       </dependency>
       <dependency>
          <groupId>org.apache.qpid</groupId>
          <artifactId>qpid-client</artifactId>
@@ -205,6 +209,22 @@
          <version>${vertx.testtools.version}</version>
          <scope>test</scope>
       </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>activemq-client</artifactId>
+         <scope>test</scope>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.geronimo.specs</groupId>
+         <artifactId>geronimo-j2ee-management_1.1_spec</artifactId>
+         <version>1.0.1</version>
+         <scope>test</scope>
+      </dependency>
+      <dependency>
+         <groupId>org.jboss.ironjacamar</groupId>
+         <artifactId>ironjacamar-core-api</artifactId>
+         <scope>test</scope>
+      </dependency>
    </dependencies>
 
    <build>
@@ -234,7 +254,6 @@
                <skipTests>${skipIntegrationTests}</skipTests>
                <excludes>
                   <exclude>**/ReplicatedJMSFailoverTest.java</exclude>
-                  <exclude>**/Colocated*Test.java</exclude>
                   <exclude>org/hornetq/tests/util/*.java</exclude>
                </excludes>
                <argLine>-Djgroups.bind_addr=::1 ${hornetq-surefire-argline}</argLine>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java
index fc66974..e1a859e 100644
--- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/DuplicateDetectionTest.java
@@ -29,6 +29,7 @@ import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.config.Configuration;
+import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.postoffice.impl.PostOfficeImpl;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.transaction.impl.XidImpl;
@@ -169,6 +170,8 @@ public class DuplicateDetectionTest extends ServiceTestBase
 
       ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
 
+      locator.setBlockOnNonDurableSend(true);
+
       ClientSessionFactory sf = createSessionFactory(locator);
 
       ClientSession session = sf.createSession(false, true, true);
@@ -209,10 +212,23 @@ public class DuplicateDetectionTest extends ServiceTestBase
          message2 = consumer.receiveImmediate();
          Assert.assertNull(message2);
 
+         message = createMessage(session, 3);
+         message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, dupID.getData());
+         producer.send(message);
+         message2 = consumer.receive(1000);
+         Assert.assertEquals(3, message2.getObjectProperty(propKey));
+
+         message = createMessage(session, 4);
+         message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, dupID.getData());
+         producer.send(message);
+         message2 = consumer.receiveImmediate();
+         Assert.assertNull(message2);
+
          producer.close();
          consumer.close();
 
-         Assert.assertEquals(1, ((PostOfficeImpl)messagingService.getPostOffice()).getDuplicateIDCaches().size());
+         // there will be 2 ID caches, one for messages using "_HQ_DUPL_ID" and one for "_HQ_BRIDGE_DUP"
+         Assert.assertEquals(2, ((PostOfficeImpl)messagingService.getPostOffice()).getDuplicateIDCaches().size());
          session.deleteQueue(queueName);
          Assert.assertEquals(0, ((PostOfficeImpl)messagingService.getPostOffice()).getDuplicateIDCaches().size());
       }
@@ -1275,9 +1291,8 @@ public class DuplicateDetectionTest extends ServiceTestBase
    {
       messagingService.stop();
 
-      Configuration conf = createDefaultConfig();
-
-      conf.setIDCacheSize(cacheSize);
+      Configuration conf = createDefaultConfig()
+         .setIDCacheSize(cacheSize);
 
       HornetQServer messagingService2 = createServer(conf);
 
@@ -1361,11 +1376,10 @@ public class DuplicateDetectionTest extends ServiceTestBase
    {
       messagingService.stop();
 
-      Configuration conf = createDefaultConfig();
-
       final int theCacheSize = 5;
 
-      conf.setIDCacheSize(theCacheSize);
+      Configuration conf = createDefaultConfig()
+         .setIDCacheSize(theCacheSize);
 
       HornetQServer messagingService2 = createServer(conf);
 
@@ -1443,12 +1457,11 @@ public class DuplicateDetectionTest extends ServiceTestBase
    {
       messagingService.stop();
 
-      Configuration conf = createDefaultConfig();
-
       final int initialCacheSize = 10;
       final int subsequentCacheSize = 5;
 
-      conf.setIDCacheSize(initialCacheSize);
+      Configuration conf = createDefaultConfig()
+         .setIDCacheSize(initialCacheSize);
 
       HornetQServer messagingService2 = createServer(conf);
 
@@ -1537,12 +1550,11 @@ public class DuplicateDetectionTest extends ServiceTestBase
    {
       messagingService.stop();
 
-      Configuration conf = createDefaultConfig();
-
       final int initialCacheSize = 10;
       final int subsequentCacheSize = 5;
 
-      conf.setIDCacheSize(initialCacheSize);
+      Configuration conf = createDefaultConfig()
+         .setIDCacheSize(initialCacheSize);
 
       HornetQServer messagingService2 = createServer(conf);
 
@@ -1641,11 +1653,9 @@ public class DuplicateDetectionTest extends ServiceTestBase
    {
       messagingService.stop();
 
-      Configuration conf = createDefaultConfig();
-
-      conf.setIDCacheSize(cacheSize);
-
-      conf.setPersistIDCache(false);
+      Configuration conf = createDefaultConfig()
+         .setIDCacheSize(cacheSize)
+         .setPersistIDCache(false);
 
       HornetQServer messagingService2 = createServer(conf);
 
@@ -1729,11 +1739,9 @@ public class DuplicateDetectionTest extends ServiceTestBase
    {
       messagingService.stop();
 
-      Configuration conf = createDefaultConfig();
-
-      conf.setIDCacheSize(cacheSize);
-
-      conf.setPersistIDCache(false);
+      Configuration conf = createDefaultConfig()
+         .setIDCacheSize(cacheSize)
+         .setPersistIDCache(false);
 
       HornetQServer messagingService2 = createServer(conf);
 
@@ -1821,9 +1829,8 @@ public class DuplicateDetectionTest extends ServiceTestBase
    {
       messagingService.stop();
 
-      Configuration conf = createDefaultConfig();
-
-      conf.setIDCacheSize(cacheSize);
+      Configuration conf = createDefaultConfig()
+         .setIDCacheSize(cacheSize);
 
       HornetQServer messagingService2 = createServer(conf);
 
@@ -1941,11 +1948,9 @@ public class DuplicateDetectionTest extends ServiceTestBase
    {
       messagingService.stop();
 
-      Configuration conf = createDefaultConfig();
-
-      conf.setIDCacheSize(cacheSize);
-
-      conf.setPersistIDCache(false);
+      Configuration conf = createDefaultConfig()
+         .setIDCacheSize(cacheSize)
+         .setPersistIDCache(false);
 
       HornetQServer messagingService2 = createServer(conf);
 
@@ -2047,9 +2052,8 @@ public class DuplicateDetectionTest extends ServiceTestBase
    {
       messagingService.stop();
 
-      Configuration conf = createDefaultConfig();
-
-      conf.setIDCacheSize(cacheSize);
+      Configuration conf = createDefaultConfig()
+         .setIDCacheSize(cacheSize);
 
       HornetQServer messagingService2 = createServer(conf);
 
@@ -2149,9 +2153,8 @@ public class DuplicateDetectionTest extends ServiceTestBase
    {
       messagingService.stop();
 
-      Configuration conf = createDefaultConfig();
-
-      conf.setIDCacheSize(cacheSize);
+      Configuration conf = createDefaultConfig()
+         .setIDCacheSize(cacheSize);
 
       HornetQServer messagingService2 = createServer(conf);
 
@@ -2263,9 +2266,8 @@ public class DuplicateDetectionTest extends ServiceTestBase
    {
       super.setUp();
 
-      Configuration conf = createDefaultConfig();
-
-      conf.setIDCacheSize(cacheSize);
+      Configuration conf = createDefaultConfig()
+         .setIDCacheSize(cacheSize);
 
       messagingService = createServer(true, conf);
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/String64KLimitTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/String64KLimitTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/String64KLimitTest.java
index b14d985..5ef7f85 100644
--- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/String64KLimitTest.java
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/String64KLimitTest.java
@@ -207,9 +207,8 @@ public class String64KLimitTest extends UnitTestCase
    {
       super.setUp();
 
-      Configuration config = createBasicConfig();
-      config.setSecurityEnabled(false);
-      config.getAcceptorConfigurations().add(new TransportConfiguration(INVM_ACCEPTOR_FACTORY));
+      Configuration config = createBasicConfig()
+         .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY));
       server = addServer(HornetQServers.newHornetQServer(config, false));
       server.start();
       locator = createInVMNonHALocator();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/aerogear/AeroGearBasicServerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/aerogear/AeroGearBasicServerTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/aerogear/AeroGearBasicServerTest.java
index ae4223c..41819bb 100644
--- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/aerogear/AeroGearBasicServerTest.java
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/aerogear/AeroGearBasicServerTest.java
@@ -73,7 +73,6 @@ public class AeroGearBasicServerTest extends ServiceTestBase
       connector0.setHost("localhost");
       jetty.addConnector(connector0);
       jetty.start();
-      Configuration configuration = createDefaultConfig();
       HashMap<String, Object> params = new HashMap();
       params.put(AeroGearConstants.QUEUE_NAME, "testQueue");
       params.put(AeroGearConstants.ENDPOINT_NAME, "http://localhost:8080");
@@ -86,10 +85,15 @@ public class AeroGearBasicServerTest extends ServiceTestBase
       params.put(AeroGearConstants.DEVICE_TYPE_NAME, "android,ipad");
       params.put(AeroGearConstants.SOUND_NAME, "sound1");
       params.put(AeroGearConstants.VARIANTS_NAME, "variant1,variant2");
-      configuration.getConnectorServiceConfigurations().add(
-         new ConnectorServiceConfiguration(AeroGearConnectorServiceFactory.class.getName(), params, "TestAeroGearService"));
-
-      configuration.getQueueConfigurations().add(new CoreQueueConfiguration("testQueue", "testQueue", null, true));
+      Configuration configuration = createDefaultConfig()
+         .addConnectorServiceConfiguration(
+            new ConnectorServiceConfiguration()
+               .setFactoryClassName(AeroGearConnectorServiceFactory.class.getName())
+               .setParams(params)
+               .setName("TestAeroGearService"))
+         .addQueueConfiguration(new CoreQueueConfiguration()
+                                   .setAddress("testQueue")
+                                   .setName("testQueue"));
       server = createServer(configuration);
       server.start();
 
@@ -151,9 +155,6 @@ public class AeroGearBasicServerTest extends ServiceTestBase
       String badge = body.getString("badge");
       assertNotNull(badge);
       assertEquals(badge, "99");
-      Integer ttl = body.getInt("ttl");
-      assertNotNull(ttl);
-      assertEquals(ttl.intValue(), 3600);
       JSONArray jsonArray = (JSONArray) aeroGearHandler.jsonObject.get("variants");
       assertNotNull(jsonArray);
       assertEquals(jsonArray.getString(0), "variant1");
@@ -167,6 +168,9 @@ public class AeroGearBasicServerTest extends ServiceTestBase
       assertNotNull(jsonArray);
       assertEquals(jsonArray.getString(0), "android");
       assertEquals(jsonArray.getString(1), "ipad");
+      Integer ttl = (Integer) aeroGearHandler.jsonObject.get("ttl");
+      assertNotNull(ttl);
+      assertEquals(ttl.intValue(), 3600);
       latch = new CountDownLatch(1);
       aeroGearHandler.resetLatch(latch);
 
@@ -194,9 +198,6 @@ public class AeroGearBasicServerTest extends ServiceTestBase
       badge = body.getString("badge");
       assertNotNull(badge);
       assertEquals(badge, "111");
-      ttl = body.getInt("ttl");
-      assertNotNull(ttl);
-      assertEquals(ttl.intValue(), 10000);
       jsonArray = (JSONArray) aeroGearHandler.jsonObject.get("variants");
       assertNotNull(jsonArray);
       assertEquals(jsonArray.getString(0), "v1");
@@ -209,6 +210,9 @@ public class AeroGearBasicServerTest extends ServiceTestBase
       assertNotNull(jsonArray);
       assertEquals(jsonArray.getString(0), "dev1");
       assertEquals(jsonArray.getString(1), "dev2");
+      ttl = (Integer) aeroGearHandler.jsonObject.get("ttl");
+      assertNotNull(ttl);
+      assertEquals(ttl.intValue(), 10000);
       session.start();
       ClientMessage message = session.createConsumer("testQueue").receiveImmediate();
       assertNull(message);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AcknowledgeTest.java
index d5b7426..5d0362c 100644
--- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AcknowledgeTest.java
@@ -30,8 +30,10 @@ import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.MessageHandler;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.protocol.core.impl.HornetQConsumerContext;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
+import org.hornetq.spi.core.remoting.ConsumerContext;
 import org.hornetq.tests.integration.IntegrationTestLogger;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.utils.UUID;
@@ -333,9 +335,9 @@ public class AcknowledgeTest extends ServiceTestBase
       }
 
       @Override
-      public Object getId()
+      public ConsumerContext getConsumerContext()
       {
-         return id;
+         return new HornetQConsumerContext(this.id);
       }
 
       @Override
@@ -363,9 +365,9 @@ public class AcknowledgeTest extends ServiceTestBase
       }
 
       @Override
-      public void setMessageHandler(MessageHandler handler) throws HornetQException
+      public FakeConsumerWithID setMessageHandler(MessageHandler handler) throws HornetQException
       {
-
+         return this;
       }
 
       @Override
@@ -410,9 +412,9 @@ public class AcknowledgeTest extends ServiceTestBase
       }
 
       @Override
-      public void setUserID(UUID userID)
+      public FakeMessageWithID setUserID(UUID userID)
       {
-
+         return this;
       }
 
       @Override
@@ -440,9 +442,9 @@ public class AcknowledgeTest extends ServiceTestBase
       }
 
       @Override
-      public void setDurable(boolean durable)
+      public FakeMessageWithID setDurable(boolean durable)
       {
-
+         return this;
       }
 
       @Override
@@ -458,9 +460,9 @@ public class AcknowledgeTest extends ServiceTestBase
       }
 
       @Override
-      public void setExpiration(long expiration)
+      public FakeMessageWithID setExpiration(long expiration)
       {
-
+         return this;
       }
 
       @Override
@@ -470,9 +472,9 @@ public class AcknowledgeTest extends ServiceTestBase
       }
 
       @Override
-      public void setTimestamp(long timestamp)
+      public FakeMessageWithID setTimestamp(long timestamp)
       {
-
+         return this;
       }
 
       @Override
@@ -482,9 +484,9 @@ public class AcknowledgeTest extends ServiceTestBase
       }
 
       @Override
-      public void setPriority(byte priority)
+      public FakeMessageWithID setPriority(byte priority)
       {
-
+         return this;
       }
 
       @Override
@@ -810,5 +812,17 @@ public class AcknowledgeTest extends ServiceTestBase
       {
          return null;
       }
+
+      @Override
+      public FakeMessageWithID writeBodyBufferBytes(byte[] bytes)
+      {
+         return this;
+      }
+
+      @Override
+      public FakeMessageWithID writeBodyBufferString(String string)
+      {
+         return this;
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CommitRollbackTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CommitRollbackTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CommitRollbackTest.java
index d4e88b9..ce687ac 100644
--- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CommitRollbackTest.java
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CommitRollbackTest.java
@@ -156,9 +156,9 @@ public class CommitRollbackTest extends ServiceTestBase
       cc2.close();
       session.rollback();
       Assert.assertEquals(0, q2.getDeliveringCount());
-      Assert.assertEquals(numMessages, q.getMessageCount());
+      Assert.assertEquals(numMessages, getMessageCount(q));
       Assert.assertEquals(0, q2.getDeliveringCount());
-      Assert.assertEquals(numMessages, q.getMessageCount());
+      Assert.assertEquals(numMessages, getMessageCount(q));
       sendSession.close();
       session.close();
    }
@@ -209,10 +209,10 @@ public class CommitRollbackTest extends ServiceTestBase
       Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
       Queue q = (Queue) server.getPostOffice().getBinding(queueA).getBindable();
       Assert.assertEquals(numMessages, q.getDeliveringCount());
-      Assert.assertEquals(numMessages, q.getMessageCount());
+      Assert.assertEquals(numMessages, getMessageCount(q));
       session.commit();
       Assert.assertEquals(0, q.getDeliveringCount());
-      Assert.assertEquals(0, q.getMessageCount());
+      Assert.assertEquals(0, getMessageCount(q));
       sendSession.close();
       session.close();
 
@@ -243,11 +243,11 @@ public class CommitRollbackTest extends ServiceTestBase
       Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
       Queue q = (Queue) server.getPostOffice().getBinding(queueA).getBindable();
       Assert.assertEquals(numMessages, q.getDeliveringCount());
-      Assert.assertEquals(numMessages, q.getMessageCount());
+      Assert.assertEquals(numMessages, getMessageCount(q));
       session.stop();
       session.rollback();
       Assert.assertEquals(0, q.getDeliveringCount());
-      Assert.assertEquals(numMessages, q.getMessageCount());
+      Assert.assertEquals(numMessages, getMessageCount(q));
       latch = new CountDownLatch(numMessages);
       cc.setMessageHandler(new ackHandler(session, latch));
       session.start();


Mime
View raw message