activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [3/6] activemq-artemis git commit: ARTEMIS-238 and ARTEMIS-236 Fixing Legacy protocol support
Date Fri, 09 Oct 2015 02:55:54 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
index d5c0d38..918dd0d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.server.cluster;
 
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.protocol.ServerPacketDecoder;
 import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder;
@@ -27,10 +28,23 @@ import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactor
  */
 public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolManagerFactory {
 
-   private static final ActiveMQServerSideProtocolManagerFactory INSTANCE = new ActiveMQServerSideProtocolManagerFactory();
 
-   public static ActiveMQServerSideProtocolManagerFactory getInstance() {
-      return INSTANCE;
+   ServerLocator locator;
+
+   @Override
+   public ServerLocator getLocator() {
+      return locator;
+   }
+
+   @Override
+   public void setLocator(ServerLocator locator) {
+      this.locator = locator;
+   }
+
+   public static ActiveMQServerSideProtocolManagerFactory getInstance(ServerLocator locator) {
+      ActiveMQServerSideProtocolManagerFactory instance = new ActiveMQServerSideProtocolManagerFactory();
+      instance.setLocator(locator);
+      return instance;
    }
 
    private ActiveMQServerSideProtocolManagerFactory() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
index 44d28c0..bc4a0ee 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
@@ -207,7 +207,7 @@ public class BackupManager implements ActiveMQComponent {
             backupServerLocator.setIdentity("backupLocatorFor='" + server + "'");
             backupServerLocator.setReconnectAttempts(-1);
             backupServerLocator.setInitialConnectAttempts(-1);
-            backupServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
+            backupServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(backupServerLocator));
          }
       }
 
@@ -332,7 +332,7 @@ public class BackupManager implements ActiveMQComponent {
             }
             ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs);
             locator.setClusterConnection(true);
-            locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
+            locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
             return locator;
          }
          return null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
index 1a115cf..bd097be 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
@@ -182,7 +182,7 @@ public class ClusterController implements ActiveMQComponent {
       serverLocator.setReconnectAttempts(-1);
       serverLocator.setInitialConnectAttempts(-1);
       //this is used for replication so need to use the server packet decoder
-      serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
+      serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
       locators.put(name, serverLocator);
    }
 
@@ -237,7 +237,7 @@ public class ClusterController implements ActiveMQComponent {
     * @return the Cluster Control
     */
    public ClusterControl connectToNodeInCluster(ClientSessionFactoryInternal sf) {
-      sf.getServerLocator().setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
+      sf.getServerLocator().setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(sf.getServerLocator()));
       return new ClusterControl(sf, server);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index e336b33..4db67fc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -129,7 +129,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
 
    @Override
    protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
-      serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
+      serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
       ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) serverLocator.createSessionFactory(targetNodeID);
       setSessionFactory(factory);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index e6b4bf6..d290c6e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -595,7 +595,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
 
          serverLocator.setAfterConnectionInternalListener(this);
 
-         serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
+         serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
 
          serverLocator.start(server.getExecutorFactory().getExecutor());
       }
@@ -760,7 +760,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
 
       targetLocator.setAfterConnectionInternalListener(this);
 
-      serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
+      serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
 
       targetLocator.setNodeID(nodeId);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
index 77894bf..77b9f3f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import java.util.List;
+import java.util.Map;
+
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
@@ -31,15 +34,12 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.QueueFactory;
-import org.apache.activemq.artemis.core.server.cluster.ClusterController;
 import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
+import org.apache.activemq.artemis.core.server.cluster.ClusterController;
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
 
-import java.util.List;
-import java.util.Map;
-
 /*
 * Instead of loading into its own post office this will use its parent server (the actual live server) and load into that.
 * Since the server is already running we have to make sure we don't route any message that may subsequently get deleted or acked.
@@ -88,7 +88,7 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
                         ResourceManager resourceManager,
                         Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
       ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer.getStorageManager());
-      locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
+      locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
 
       try (ClientSessionFactory sessionFactory = locator.createSessionFactory()) {
          scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
index 1c82bbf..59ffd6a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
@@ -115,7 +115,7 @@ public class LiveOnlyActivation extends Activation {
       try {
          scaleDownServerLocator = ScaleDownPolicy.getScaleDownConnector(scaleDownPolicy, activeMQServer);
          //use a Node Locator to connect to the cluster
-         scaleDownServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
+         scaleDownServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(scaleDownServerLocator));
          LiveNodeLocator nodeLocator = scaleDownPolicy.getGroupName() == null ? new AnyLiveNodeLocatorForScaleDown(activeMQServer) : new NamedLiveNodeLocatorForScaleDown(scaleDownPolicy.getGroupName(), activeMQServer);
          scaleDownServerLocator.addClusterTopologyListener(nodeLocator);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
index 8ff315a..3de5d5d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
@@ -55,5 +55,10 @@ public interface ProtocolManager<P extends BaseInterceptor> {
     */
    MessageConverter getConverter();
 
+   /** If this protocols accepts connectoins without an initial handshake.
+    *  If true this protocol will be the failback case no other conenctions are made.
+    *  New designed protocols should always require a handshake. This is only useful for legacy protocols. */
+   boolean acceptsNoHandshake();
+
    void handshake(NettyServerConnection connection, ActiveMQBuffer buffer);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 436ec3c..715094c 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -24,11 +24,15 @@ import java.beans.Introspector;
 import java.beans.PropertyDescriptor;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -206,6 +210,18 @@ public abstract class ActiveMQTestBase extends Assert {
       temporaryFolder = new TemporaryFolder(parent);
    }
 
+   protected <T> T serialClone(Object object) throws Exception {
+      System.out.println("object::" + object);
+      ByteArrayOutputStream bout = new ByteArrayOutputStream();
+      ObjectOutputStream obOut = new ObjectOutputStream(bout);
+      obOut.writeObject(object);
+
+      ByteArrayInputStream binput = new ByteArrayInputStream(bout.toByteArray());
+      ObjectInputStream obinp = new ObjectInputStream(binput);
+      return (T) obinp.readObject();
+
+   }
+
    @After
    public void tearDown() throws Exception {
       for (ExecutorService s : executorSet) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXAResourceRecovery.java
----------------------------------------------------------------------
diff --git a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXAResourceRecovery.java b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXAResourceRecovery.java
index e6bb229..3b2b3e7 100644
--- a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXAResourceRecovery.java
+++ b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXAResourceRecovery.java
@@ -75,7 +75,7 @@ public class ActiveMQXAResourceRecovery {
          String username = parser.getUsername();
          String password = parser.getPassword();
          TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams);
-         xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password, null);
+         xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password, null, null);
       }
 
       res = new ActiveMQXAResourceWrapper(xaRecoveryConfigs);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java
----------------------------------------------------------------------
diff --git a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java
index 6b3172b..a75bdac 100644
--- a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java
+++ b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
 
 /**
  * This represents the configuration of a single connection factory.
@@ -42,13 +43,14 @@ public class XARecoveryConfig {
    private final String username;
    private final String password;
    private final Map<String, String> properties;
+   private final ClientProtocolManagerFactory clientProtocolManager;
 
    public static XARecoveryConfig newConfig(ActiveMQConnectionFactory factory, String userName, String password, Map<String, String> properties) {
       if (factory.getServerLocator().getDiscoveryGroupConfiguration() != null) {
-         return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password, properties);
+         return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password, properties, factory.getServerLocator().getProtocolManagerFactory());
       }
       else {
-         return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password, properties);
+         return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password, properties, factory.getServerLocator().getProtocolManagerFactory());
       }
 
    }
@@ -57,28 +59,54 @@ public class XARecoveryConfig {
                            final TransportConfiguration[] transportConfiguration,
                            final String username,
                            final String password,
-                           final Map<String, String> properties) {
-      this.transportConfiguration = transportConfiguration;
+                           final Map<String, String> properties,
+                           final ClientProtocolManagerFactory clientProtocolManager) {
+      TransportConfiguration[] newTransportConfiguration = new TransportConfiguration[transportConfiguration.length];
+      for (int i = 0; i < transportConfiguration.length; i++) {
+         newTransportConfiguration[i] = transportConfiguration[i].newTransportConfig("");
+      }
+
+      this.transportConfiguration = newTransportConfiguration;
       this.discoveryConfiguration = null;
       this.username = username;
       this.password = password;
       this.ha = ha;
       this.properties = properties == null ? Collections.unmodifiableMap(new HashMap<String, String>()) : Collections.unmodifiableMap(properties);
+      this.clientProtocolManager = clientProtocolManager;
    }
 
+
    public XARecoveryConfig(final boolean ha,
-                           final DiscoveryGroupConfiguration discoveryConfiguration,
+                           final TransportConfiguration[] transportConfiguration,
                            final String username,
                            final String password,
                            final Map<String, String> properties) {
+      this(ha, transportConfiguration, username, password, properties, null);
+   }
+
+   public XARecoveryConfig(final boolean ha,
+                           final DiscoveryGroupConfiguration discoveryConfiguration,
+                           final String username,
+                           final String password,
+                           final Map<String, String> properties,
+                           final ClientProtocolManagerFactory clientProtocolManager) {
       this.discoveryConfiguration = discoveryConfiguration;
       this.transportConfiguration = null;
       this.username = username;
       this.password = password;
       this.ha = ha;
+      this.clientProtocolManager = clientProtocolManager;
       this.properties = properties == null ? Collections.unmodifiableMap(new HashMap<String, String>()) : Collections.unmodifiableMap(properties);
    }
 
+   public XARecoveryConfig(final boolean ha,
+                           final DiscoveryGroupConfiguration discoveryConfiguration,
+                           final String username,
+                           final String password,
+                           final Map<String, String> properties) {
+      this(ha, discoveryConfiguration, username, password, properties, null);
+   }
+
    public boolean isHA() {
       return ha;
    }
@@ -103,6 +131,10 @@ public class XARecoveryConfig {
       return properties;
    }
 
+   public ClientProtocolManagerFactory getClientProtocolManager() {
+      return clientProtocolManager;
+   }
+
    /**
     * Create a serverLocator using the configuration
     *
@@ -110,10 +142,10 @@ public class XARecoveryConfig {
     */
    public ServerLocator createServerLocator() {
       if (getDiscoveryConfiguration() != null) {
-         return ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration());
+         return ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration()).setProtocolManagerFactory(clientProtocolManager);
       }
       else {
-         return ActiveMQClient.createServerLocator(isHA(), getTransportConfig());
+         return ActiveMQClient.createServerLocator(isHA(), getTransportConfig()).setProtocolManagerFactory(clientProtocolManager);
       }
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-service-extensions/src/test/java/org/apache/activemq/artemis/service/extensions/tests/recovery/XARecoveryConfigTest.java
----------------------------------------------------------------------
diff --git a/artemis-service-extensions/src/test/java/org/apache/activemq/artemis/service/extensions/tests/recovery/XARecoveryConfigTest.java b/artemis-service-extensions/src/test/java/org/apache/activemq/artemis/service/extensions/tests/recovery/XARecoveryConfigTest.java
new file mode 100644
index 0000000..b5f1021
--- /dev/null
+++ b/artemis-service-extensions/src/test/java/org/apache/activemq/artemis/service/extensions/tests/recovery/XARecoveryConfigTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.service.extensions.tests.recovery;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class XARecoveryConfigTest {
+
+   @Test
+   public void testEquals() throws Exception {
+      String factClass = "org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory";
+
+      TransportConfiguration transportConfig = new TransportConfiguration(factClass, null);
+      XARecoveryConfig config = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig},
+                                                     null, null, null);
+
+      TransportConfiguration transportConfig2 = new TransportConfiguration(factClass, null);
+      XARecoveryConfig config2 = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig2},
+                                                      null, null, null);
+
+      // They are using Different names
+      Assert.assertNotEquals(transportConfig, transportConfig2);
+      Assert.assertEquals(transportConfig.newTransportConfig(""), transportConfig2.newTransportConfig(""));
+
+      // The equals here shouldn't take the name into consideration
+      Assert.assertEquals(config, config2);
+   }
+
+   @Test
+   public void testNotEquals() throws Exception {
+      String factClass = "org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory";
+
+      TransportConfiguration transportConfig = new TransportConfiguration(factClass, null);
+      XARecoveryConfig config = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig},
+                                                     null, null, null);
+
+      TransportConfiguration transportConfig2 = new TransportConfiguration(factClass + "2", null);
+      XARecoveryConfig config2 = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig2},
+                                                      null, null, null);
+
+      // They are using Different names
+      Assert.assertNotEquals(transportConfig, transportConfig2);
+      Assert.assertNotEquals(transportConfig.newTransportConfig(""), transportConfig2.newTransportConfig(""));
+
+      // The equals here shouldn't take the name into consideration
+      Assert.assertNotEquals(config, config2);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolManagerTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolManagerTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolManagerTest.java
new file mode 100644
index 0000000..3768f8b
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolManagerTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.extras.protocols.hornetq;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration;
+import org.apache.activemq.artemis.jms.server.config.JMSConfiguration;
+import org.apache.activemq.artemis.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSQueueConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.artemis.ra.recovery.RecoveryManager;
+import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HornetQProtocolManagerTest extends ActiveMQTestBase {
+
+   ActiveMQServer server;
+   EmbeddedJMS embeddedJMS;
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      Configuration configuration = createDefaultConfig(false);
+      configuration.setPersistenceEnabled(false);
+      configuration.getAcceptorConfigurations().clear();
+      configuration.addAcceptorConfiguration("legacy", "tcp://localhost:61616?protocols=HORNETQ").
+                    addAcceptorConfiguration("corepr", "tcp://localhost:61617?protocols=CORE");
+
+      configuration.addConnectorConfiguration("legacy", "tcp://localhost:61616");
+      JMSConfiguration jmsConfiguration = new JMSConfigurationImpl();
+
+      jmsConfiguration.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName("testQueue").setBindings("testQueue"));
+      embeddedJMS = new EmbeddedJMS();
+      embeddedJMS.setConfiguration(configuration);
+      embeddedJMS.setJmsConfiguration(jmsConfiguration);
+      embeddedJMS.start();
+   }
+
+   public void tearDown() throws Exception {
+      embeddedJMS.stop();
+      super.tearDown();
+   }
+
+   @Test
+   public void testLegacy() throws Exception {
+      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=" + HornetQClientProtocolManagerFactory.class.getName());
+      connectionFactory.createConnection().close();
+      ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory("tcp://localhost:61617");
+      connectionFactory2.createConnection().close();
+
+      RecoveryManager manager = new RecoveryManager();
+      manager.register(connectionFactory, null, null, new ConcurrentHashMap<String, String>());
+      manager.register(connectionFactory2, null, null, new ConcurrentHashMap<String, String>());
+
+      for (XARecoveryConfig resource :manager.getResources()) {
+         ServerLocator locator = resource.createServerLocator();
+         ClientSessionFactory factory = locator.createSessionFactory();
+         ClientSession session = factory.createSession();
+         session.close();
+         factory.close();
+         locator.close();
+      }
+
+   }
+
+
+   /** This test will use an ArtemisConnectionFactory with clientProtocolManager=*/
+   @Test
+   public void testLegacy2() throws Exception {
+
+      ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl();
+      configuration.setConnectorNames("legacy");
+      configuration.setName("legacy");
+      configuration.setProtocolManagerFactoryStr(HornetQClientProtocolManagerFactory.class.getName());
+      embeddedJMS.getJMSServerManager().createConnectionFactory(false, configuration, "legacy");
+
+      Queue queue = (Queue) embeddedJMS.lookup("testQueue");
+
+
+      ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) embeddedJMS.lookup("legacy");
+      Connection connection = connectionFactory.createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = session.createProducer(queue);
+
+      TextMessage message = session.createTextMessage("Test");
+      for (int i = 0; i < 5; i++) {
+         message.setStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID.toString(), "duplicate");
+         producer.send(message);
+      }
+
+      connection.start();
+      MessageConsumer consumer = session.createConsumer(queue);
+      TextMessage messageRec = (TextMessage)consumer.receive(5000);
+      Assert.assertNotNull(messageRec);
+
+      Assert.assertEquals("Test", messageRec.getText());
+      Assert.assertNull(consumer.receiveNoWait());
+      connection.close();
+      connectionFactory.close();
+
+   }
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolTest.java
index 789e73a..c4b17f6 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolTest.java
@@ -17,6 +17,10 @@
 
 package org.apache.activemq.artemis.tests.extras.protocols.hornetq;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -29,15 +33,13 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactor
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.hornetq.api.core.client.HornetQClient;
+import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
 public class HornetQProtocolTest extends ActiveMQTestBase {
 
    protected ActiveMQServer server;
@@ -59,6 +61,13 @@ public class HornetQProtocolTest extends ActiveMQTestBase {
       waitForServerToStart(server);
    }
 
+
+   @After
+   public void tearDown() throws Exception {
+      org.hornetq.core.client.impl.ServerLocatorImpl.clearThreadPools();
+      super.tearDown();
+   }
+
    @Test
    public void testMessagePropertiesAreTransformedBetweenCoreAndHQProtocols() throws Exception {
       org.hornetq.api.core.client.ClientSession hqSession = createHQClientSession();
@@ -83,6 +92,7 @@ public class HornetQProtocolTest extends ActiveMQTestBase {
       }
 
       ClientMessage coreMessage1 = coreConsumer.receive(1000);
+      System.err.println("Messages::==" + coreMessage1);
       assertTrue(coreMessage1.containsProperty(Message.HDR_DUPLICATE_DETECTION_ID));
       coreSession.close();
 
@@ -94,6 +104,39 @@ public class HornetQProtocolTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testLargeMessagesOverHornetQClients() throws Exception {
+      org.hornetq.api.core.client.ClientSession hqSession = createHQClientSession();
+
+      // Create Queue
+      String queueName = "test.hq.queue";
+      hqSession.createQueue(queueName, queueName, true);
+
+      // HornetQ Client Objects
+      hqSession.start();
+      org.hornetq.api.core.client.ClientProducer hqProducer = hqSession.createProducer(queueName);
+      org.hornetq.api.core.client.ClientConsumer hqConsumer = hqSession.createConsumer(queueName);
+
+      for (int i = 0; i < 2; i++) {
+         org.hornetq.api.core.client.ClientMessage hqMessage = hqSession.createMessage(true);
+         hqMessage.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10 * 1024));
+         hqProducer.send(hqMessage);
+      }
+      hqSession.commit();
+
+      for (int i = 0; i < 2; i++) {
+         org.hornetq.api.core.client.ClientMessage coreMessage1 = hqConsumer.receive(1000);
+         coreMessage1.acknowledge();
+         System.err.println("Messages::==" + coreMessage1);
+         for (int j = 0; j < 10 * 1024; j++) {
+            Assert.assertEquals(ActiveMQTestBase.getSamplebyte(j), coreMessage1.getBodyBuffer().readByte());
+         }
+
+      }
+
+      hqSession.close();
+   }
+
+   @Test
    public void testDuplicateIDPropertyWithHornetQProtocol() throws Exception {
       org.hornetq.api.core.client.ClientSession session = createHQClientSession();
 
@@ -158,14 +201,14 @@ public class HornetQProtocolTest extends ActiveMQTestBase {
    }
 
    private org.hornetq.api.core.client.ClientMessage createHQTestMessage(org.hornetq.api.core.client.ClientSession session) {
-      org.hornetq.api.core.client.ClientMessage message = session.createMessage(false);
+      org.hornetq.api.core.client.ClientMessage message = session.createMessage(true);
       String v = UUID.randomUUID().toString();
       message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), v);
       return message;
    }
 
    private ClientMessage createCoreTestMessage(ClientSession session) {
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(true);
       String v = UUID.randomUUID().toString();
       message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), v);
       return message;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/InterceptorTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/InterceptorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/InterceptorTest.java
deleted file mode 100644
index c87833b..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/InterceptorTest.java
+++ /dev/null
@@ -1,1030 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration;
-
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
-import org.apache.activemq.artemis.api.core.Interceptor;
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.client.ClientConsumer;
-import org.apache.activemq.artemis.api.core.client.ClientMessage;
-import org.apache.activemq.artemis.api.core.client.ClientProducer;
-import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
-import org.apache.activemq.artemis.api.core.client.ServerLocator;
-import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
-import org.apache.activemq.artemis.core.protocol.core.Packet;
-import org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler;
-import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl;
-import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
-import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacket;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class InterceptorTest extends ActiveMQTestBase {
-
-   private ActiveMQServer server;
-
-   private final SimpleString QUEUE = new SimpleString("InterceptorTestQueue");
-
-   private ServerLocator locator;
-
-   @Override
-   @Before
-   public void setUp() throws Exception {
-      super.setUp();
-
-      server = createServer(false, true);
-
-      server.start();
-
-      locator = createNettyNonHALocator();
-   }
-
-   private static final String key = "fruit";
-
-   private class MyInterceptor1 implements Interceptor {
-
-      public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
-         if (packet.getType() == PacketImpl.SESS_SEND) {
-            SessionSendMessage p = (SessionSendMessage) packet;
-
-            ServerMessage sm = (ServerMessage) p.getMessage();
-
-            sm.putStringProperty(InterceptorTest.key, "orange");
-         }
-
-         return true;
-      }
-
-   }
-
-   private class InterceptUserOnCreateQueue implements Interceptor {
-
-      public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
-         if (packet.getType() == PacketImpl.CREATE_QUEUE) {
-            String userName = getUsername(packet, connection);
-            CreateQueueMessage createQueue = (CreateQueueMessage) packet;
-            createQueue.setFilterString(new SimpleString("userName='" + userName + "'"));
-
-            System.out.println("userName = " + userName);
-         }
-         else if (packet.getType() == PacketImpl.SESS_SEND) {
-            String userName = getUsername(packet, connection);
-            MessagePacket msgPacket = (MessagePacket) packet;
-            msgPacket.getMessage().putStringProperty("userName", userName);
-
-            System.out.println("userName on send = " + userName);
-         }
-
-         return true;
-      }
-
-      public String getUsername(final Packet packet, final RemotingConnection connection) {
-         RemotingConnectionImpl impl = (RemotingConnectionImpl) connection;
-         ChannelImpl channel = (ChannelImpl) impl.getChannel(packet.getChannelID(), -1);
-         ServerSessionPacketHandler sessionHandler = (ServerSessionPacketHandler) channel.getHandler();
-         return sessionHandler.getSession().getUsername();
-      }
-
-   }
-
-   private class InterceptUserOnCreateConsumer implements Interceptor {
-
-      public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
-         if (packet.getType() == PacketImpl.SESS_CREATECONSUMER) {
-            String userName = getUsername(packet, connection);
-            SessionCreateConsumerMessage createQueue = (SessionCreateConsumerMessage) packet;
-            createQueue.setFilterString(new SimpleString("userName='" + userName + "'"));
-
-            System.out.println("userName = " + userName);
-         }
-         else if (packet.getType() == PacketImpl.SESS_SEND) {
-            String userName = getUsername(packet, connection);
-            MessagePacket msgPacket = (MessagePacket) packet;
-            msgPacket.getMessage().putStringProperty("userName", userName);
-
-            System.out.println("userName on send = " + userName);
-         }
-
-         return true;
-      }
-
-      public String getUsername(final Packet packet, final RemotingConnection connection) {
-         RemotingConnectionImpl impl = (RemotingConnectionImpl) connection;
-         ChannelImpl channel = (ChannelImpl) impl.getChannel(packet.getChannelID(), -1);
-         ServerSessionPacketHandler sessionHandler = (ServerSessionPacketHandler) channel.getHandler();
-         return sessionHandler.getSession().getUsername();
-      }
-
-   }
-
-   private class MyOutgoingInterceptor1 implements Interceptor {
-
-      public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
-         if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG) {
-            SessionReceiveMessage p = (SessionReceiveMessage) packet;
-
-            ServerMessage sm = (ServerMessage) p.getMessage();
-
-            sm.putStringProperty(InterceptorTest.key, "orange");
-         }
-
-         return true;
-      }
-
-   }
-
-   private class MyInterceptor2 implements Interceptor {
-
-      public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
-         if (packet.getType() == PacketImpl.SESS_SEND) {
-            return false;
-         }
-
-         return true;
-      }
-
-   }
-
-   private class MyOutgoingInterceptor2 implements Interceptor {
-
-      public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
-         if (isForceDeliveryResponse(packet)) {
-            return true;
-         }
-
-         if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG) {
-            return false;
-         }
-
-         return true;
-      }
-   }
-
-   private class MyInterceptor3 implements Interceptor {
-
-      public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
-         if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG) {
-            SessionReceiveMessage p = (SessionReceiveMessage) packet;
-
-            ClientMessage cm = (ClientMessage) p.getMessage();
-
-            cm.putStringProperty(InterceptorTest.key, "orange");
-         }
-
-         return true;
-      }
-
-   }
-
-   private class MyOutgoingInterceptor3 implements Interceptor {
-
-      public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
-         if (packet.getType() == PacketImpl.SESS_SEND) {
-            SessionSendMessage p = (SessionSendMessage) packet;
-
-            ClientMessage cm = (ClientMessage) p.getMessage();
-
-            cm.putStringProperty(InterceptorTest.key, "orange");
-         }
-
-         return true;
-      }
-
-   }
-
-   private class MyInterceptor4 implements Interceptor {
-
-      public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
-         if (isForceDeliveryResponse(packet)) {
-            return true;
-         }
-
-         if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG) {
-            return false;
-         }
-
-         return true;
-      }
-
-   }
-
-   private class MyOutgoingInterceptor4 implements Interceptor {
-
-      public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
-         if (isForceDeliveryResponse(packet)) {
-            return true;
-         }
-
-         if (packet.getType() == PacketImpl.SESS_SEND) {
-            return false;
-         }
-
-         return true;
-      }
-
-   }
-
-   /**
-    * @param packet
-    */
-   private boolean isForceDeliveryResponse(final Packet packet) {
-      if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG) {
-         SessionReceiveMessage msg = (SessionReceiveMessage) packet;
-         if (msg.getMessage().containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
-            return true;
-         }
-      }
-
-      return false;
-   }
-
-   private class MyInterceptor5 implements Interceptor {
-
-      private final String key;
-
-      private final int num;
-
-      private volatile boolean reject;
-
-      private volatile boolean wasCalled;
-
-      MyInterceptor5(final String key, final int num) {
-         this.key = key;
-
-         this.num = num;
-      }
-
-      public void setReject(final boolean reject) {
-         this.reject = reject;
-      }
-
-      public boolean wasCalled() {
-         return wasCalled;
-      }
-
-      public void setWasCalled(final boolean wasCalled) {
-         this.wasCalled = wasCalled;
-      }
-
-      public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
-         if (packet.getType() == PacketImpl.SESS_SEND) {
-            SessionSendMessage p = (SessionSendMessage) packet;
-
-            ServerMessage sm = (ServerMessage) p.getMessage();
-
-            sm.putIntProperty(key, num);
-
-            wasCalled = true;
-
-            return !reject;
-         }
-
-         return true;
-
-      }
-
-   }
-
-   private class MyInterceptor6 implements Interceptor {
-
-      private final String key;
-
-      private final int num;
-
-      private volatile boolean reject;
-
-      private volatile boolean wasCalled;
-
-      MyInterceptor6(final String key, final int num) {
-         this.key = key;
-
-         this.num = num;
-      }
-
-      public void setReject(final boolean reject) {
-         this.reject = reject;
-      }
-
-      public boolean wasCalled() {
-         return wasCalled;
-      }
-
-      public void setWasCalled(final boolean wasCalled) {
-         this.wasCalled = wasCalled;
-      }
-
-      public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
-
-         if (isForceDeliveryResponse(packet)) {
-            return true;
-         }
-
-         if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG) {
-            SessionReceiveMessage p = (SessionReceiveMessage) packet;
-
-            Message sm = p.getMessage();
-
-            sm.putIntProperty(key, num);
-
-            wasCalled = true;
-
-            return !reject;
-         }
-
-         return true;
-
-      }
-
-   }
-
-   @Test
-   public void testServerInterceptorChangeProperty() throws Exception {
-      MyInterceptor1 interceptor = new MyInterceptor1();
-
-      server.getRemotingService().addIncomingInterceptor(interceptor);
-
-      ClientSessionFactory sf = createSessionFactory(locator);
-
-      ClientSession session = sf.createSession(false, true, true, true);
-
-      session.createQueue(QUEUE, QUEUE, null, false);
-
-      ClientProducer producer = session.createProducer(QUEUE);
-
-      final int numMessages = 10;
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         message.putIntProperty("count", i);
-
-         message.putStringProperty(InterceptorTest.key, "apple");
-
-         producer.send(message);
-      }
-
-      ClientConsumer consumer = session.createConsumer(QUEUE);
-
-      session.start();
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = consumer.receive(1000);
-
-         assertNotNull(message);
-
-         assertEquals(i, message.getIntProperty("count").intValue());
-
-         Assert.assertEquals("orange", message.getStringProperty(InterceptorTest.key));
-      }
-
-      server.getRemotingService().removeIncomingInterceptor(interceptor);
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         message.putStringProperty(InterceptorTest.key, "apple");
-
-         producer.send(message);
-      }
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = consumer.receive(1000);
-
-         Assert.assertEquals("apple", message.getStringProperty(InterceptorTest.key));
-      }
-
-      session.close();
-   }
-
-   // This is testing if it's possible to intercept usernames and do some real stuff as users want
-   @Test
-   public void testInterceptUsernameOnQueues() throws Exception {
-
-      SimpleString ANOTHER_QUEUE = QUEUE.concat("another");
-      ActiveMQSecurityManagerImpl securityManager = (ActiveMQSecurityManagerImpl) server.getSecurityManager();
-      securityManager.getConfiguration().addUser("dumb", "dumber");
-      securityManager.getConfiguration().addUser("an", "other");
-
-      server.getRemotingService().addIncomingInterceptor(new InterceptUserOnCreateQueue());
-
-      locator.setBlockOnDurableSend(true);
-      ClientSessionFactory sf = createSessionFactory(locator);
-
-      ClientSession session = sf.createSession("dumb", "dumber", false, false, false, false, 0);
-
-      ClientSession sessionAnotherUser = sf.createSession("an", "other", false, false, false, false, 0);
-
-      session.createQueue(QUEUE, QUEUE, null, true);
-
-      sessionAnotherUser.createQueue(QUEUE, ANOTHER_QUEUE, null, true);
-
-      ClientProducer prod = session.createProducer(QUEUE);
-
-      ClientProducer prodAnother = sessionAnotherUser.createProducer(QUEUE);
-
-      ClientMessage msg = session.createMessage(true);
-      prod.send(msg);
-      session.commit();
-
-      prodAnother.send(msg);
-      sessionAnotherUser.commit();
-
-      ClientConsumer consumer = session.createConsumer(QUEUE);
-      ClientConsumer consumerAnother = sessionAnotherUser.createConsumer(ANOTHER_QUEUE);
-
-      session.start();
-      sessionAnotherUser.start();
-
-      msg = consumer.receive(1000);
-      assertNotNull(msg);
-      assertEquals("dumb", msg.getStringProperty("userName"));
-      msg.acknowledge();
-      assertNull(consumer.receiveImmediate());
-
-      msg = consumerAnother.receive(1000);
-      assertNotNull(msg);
-      assertEquals("an", msg.getStringProperty("userName"));
-      msg.acknowledge();
-      assertNull(consumerAnother.receiveImmediate());
-
-      session.close();
-      sessionAnotherUser.close();
-   }
-
-   // This is testing if it's possible to intercept usernames and do some real stuff as users want
-   @Test
-   public void testInterceptUsernameOnConsumer() throws Exception {
-      ActiveMQSecurityManagerImpl securityManager = (ActiveMQSecurityManagerImpl) server.getSecurityManager();
-      securityManager.getConfiguration().addUser("dumb", "dumber");
-      securityManager.getConfiguration().addUser("an", "other");
-
-      server.getRemotingService().addIncomingInterceptor(new InterceptUserOnCreateConsumer());
-
-      locator.setBlockOnDurableSend(true);
-      ClientSessionFactory sf = createSessionFactory(locator);
-
-      ClientSession session = sf.createSession("dumb", "dumber", false, false, false, false, 0);
-
-      ClientSession sessionAnotherUser = sf.createSession("an", "other", false, false, false, false, 0);
-
-      session.createQueue(QUEUE, QUEUE, null, true);
-
-      ClientProducer prod = session.createProducer(QUEUE);
-
-      ClientProducer prodAnother = sessionAnotherUser.createProducer(QUEUE);
-
-      ClientMessage msg = session.createMessage(true);
-      prod.send(msg);
-      session.commit();
-
-      prodAnother.send(msg);
-      sessionAnotherUser.commit();
-
-      ClientConsumer consumer = session.createConsumer(QUEUE);
-      ClientConsumer consumerAnother = sessionAnotherUser.createConsumer(QUEUE);
-
-      session.start();
-      sessionAnotherUser.start();
-
-      msg = consumer.receive(1000);
-      assertNotNull(msg);
-      assertEquals("dumb", msg.getStringProperty("userName"));
-      msg.acknowledge();
-      assertNull(consumer.receiveImmediate());
-
-      msg = consumerAnother.receive(1000);
-      assertNotNull(msg);
-      assertEquals("an", msg.getStringProperty("userName"));
-      msg.acknowledge();
-      assertNull(consumerAnother.receiveImmediate());
-
-      session.close();
-      sessionAnotherUser.close();
-   }
-
-   @Test
-   public void testServerInterceptorRejectPacket() throws Exception {
-      MyInterceptor2 interceptor = new MyInterceptor2();
-
-      server.getRemotingService().addIncomingInterceptor(interceptor);
-
-      locator.setBlockOnNonDurableSend(false);
-
-      ClientSessionFactory sf = createSessionFactory(locator);
-
-      ClientSession session = sf.createSession(false, true, true, true);
-
-      session.createQueue(QUEUE, QUEUE, null, false);
-
-      ClientProducer producer = session.createProducer(QUEUE);
-
-      final int numMessages = 10;
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         producer.send(message);
-      }
-
-      ClientConsumer consumer = session.createConsumer(QUEUE);
-
-      session.start();
-
-      ClientMessage message = consumer.receiveImmediate();
-
-      Assert.assertNull(message);
-
-      session.close();
-   }
-
-   @Test
-   public void testClientInterceptorChangeProperty() throws Exception {
-      ClientSessionFactory sf = createSessionFactory(locator);
-
-      MyInterceptor3 interceptor = new MyInterceptor3();
-
-      sf.getServerLocator().addIncomingInterceptor(interceptor);
-
-      ClientSession session = sf.createSession(false, true, true, true);
-
-      session.createQueue(QUEUE, QUEUE, null, false);
-
-      ClientProducer producer = session.createProducer(QUEUE);
-
-      final int numMessages = 10;
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         message.putStringProperty(InterceptorTest.key, "apple");
-
-         producer.send(message);
-      }
-
-      ClientConsumer consumer = session.createConsumer(QUEUE);
-
-      session.start();
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = consumer.receive(1000);
-
-         Assert.assertEquals("orange", message.getStringProperty(InterceptorTest.key));
-      }
-
-      sf.getServerLocator().removeIncomingInterceptor(interceptor);
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         message.putStringProperty(InterceptorTest.key, "apple");
-
-         producer.send(message);
-      }
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = consumer.receive(1000);
-
-         Assert.assertEquals("apple", message.getStringProperty(InterceptorTest.key));
-      }
-
-      session.close();
-   }
-
-   @Test
-   public void testClientOutgoingInterceptorChangeProperty() throws Exception {
-      ClientSessionFactory sf = createSessionFactory(locator);
-
-      MyOutgoingInterceptor3 interceptor = new MyOutgoingInterceptor3();
-
-      sf.getServerLocator().addOutgoingInterceptor(interceptor);
-
-      ClientSession session = sf.createSession(false, true, true, true);
-
-      session.createQueue(QUEUE, QUEUE, null, false);
-
-      ClientProducer producer = session.createProducer(QUEUE);
-
-      final int numMessages = 10;
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         message.putStringProperty(InterceptorTest.key, "apple");
-
-         producer.send(message);
-      }
-
-      ClientConsumer consumer = session.createConsumer(QUEUE);
-
-      session.start();
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = consumer.receive(1000);
-
-         Assert.assertEquals("orange", message.getStringProperty(InterceptorTest.key));
-      }
-
-      sf.getServerLocator().removeOutgoingInterceptor(interceptor);
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         message.putStringProperty(InterceptorTest.key, "apple");
-
-         producer.send(message);
-      }
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = consumer.receive(1000);
-
-         Assert.assertEquals("apple", message.getStringProperty(InterceptorTest.key));
-      }
-
-      session.close();
-   }
-
-   @Test
-   public void testClientInterceptorRejectPacket() throws Exception {
-      ClientSessionFactory sf = createSessionFactory(locator);
-
-      MyInterceptor4 interceptor = new MyInterceptor4();
-
-      sf.getServerLocator().addIncomingInterceptor(interceptor);
-
-      ClientSession session = sf.createSession(false, true, true, true);
-
-      session.createQueue(QUEUE, QUEUE, null, false);
-
-      ClientProducer producer = session.createProducer(QUEUE);
-
-      final int numMessages = 10;
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         producer.send(message);
-      }
-
-      ClientConsumer consumer = session.createConsumer(QUEUE);
-
-      session.start();
-
-      ClientMessage message = consumer.receive(100);
-
-      Assert.assertNull(message);
-
-      session.close();
-   }
-
-   @Test
-   public void testClientOutgoingInterceptorRejectPacketOnNonBlockingSend() throws Exception {
-      locator.setBlockOnNonDurableSend(false);
-      ClientSessionFactory sf = createSessionFactory(locator);
-
-      MyOutgoingInterceptor4 interceptor = new MyOutgoingInterceptor4();
-
-      sf.getServerLocator().addOutgoingInterceptor(interceptor);
-
-      ClientSession session = sf.createSession(false, true, true, true);
-
-      session.createQueue(QUEUE, QUEUE, null, false);
-
-      ClientProducer producer = session.createProducer(QUEUE);
-
-      final int numMessages = 10;
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         producer.send(message);
-      }
-
-      ClientConsumer consumer = session.createConsumer(QUEUE);
-
-      session.start();
-
-      ClientMessage message = consumer.receive(100);
-
-      Assert.assertNull(message);
-
-      session.close();
-   }
-
-   @Test
-   public void testClientOutgoingInterceptorRejectPacketOnBlockingSend() throws Exception {
-      // must make the call block to exercise the right logic
-      locator.setBlockOnNonDurableSend(true);
-      ClientSessionFactory sf = createSessionFactory(locator);
-
-      MyOutgoingInterceptor4 interceptor = new MyOutgoingInterceptor4();
-
-      sf.getServerLocator().addOutgoingInterceptor(interceptor);
-
-      ClientSession session = sf.createSession(false, true, true, true);
-
-      session.createQueue(QUEUE, QUEUE, null, false);
-
-      ClientProducer producer = session.createProducer(QUEUE);
-
-      ClientMessage message = session.createMessage(false);
-
-      try {
-         producer.send(message);
-         Assert.fail();
-      }
-      catch (ActiveMQException e) {
-         // expected exception
-         Assert.assertTrue(e.getType().getCode() == ActiveMQExceptionType.INTERCEPTOR_REJECTED_PACKET.getCode());
-      }
-   }
-
-   @Test
-   public void testServerMultipleInterceptors() throws Exception {
-      MyInterceptor5 interceptor1 = new MyInterceptor5("a", 1);
-      MyInterceptor5 interceptor2 = new MyInterceptor5("b", 2);
-      MyInterceptor5 interceptor3 = new MyInterceptor5("c", 3);
-      MyInterceptor5 interceptor4 = new MyInterceptor5("d", 4);
-
-      server.getRemotingService().addIncomingInterceptor(interceptor1);
-      server.getRemotingService().addIncomingInterceptor(interceptor2);
-      server.getRemotingService().addIncomingInterceptor(interceptor3);
-      server.getRemotingService().addIncomingInterceptor(interceptor4);
-
-      ClientSessionFactory sf = createSessionFactory(locator);
-
-      ClientSession session = sf.createSession(false, true, true, true);
-
-      session.createQueue(QUEUE, QUEUE, null, false);
-
-      ClientProducer producer = session.createProducer(QUEUE);
-
-      final int numMessages = 10;
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         producer.send(message);
-      }
-
-      ClientConsumer consumer = session.createConsumer(QUEUE);
-
-      session.start();
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = consumer.receive(1000);
-
-         Assert.assertEquals(1, message.getIntProperty("a").intValue());
-         Assert.assertEquals(2, message.getIntProperty("b").intValue());
-         Assert.assertEquals(3, message.getIntProperty("c").intValue());
-         Assert.assertEquals(4, message.getIntProperty("d").intValue());
-      }
-
-      server.getRemotingService().removeIncomingInterceptor(interceptor2);
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         producer.send(message);
-      }
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = consumer.receive(1000);
-
-         Assert.assertEquals(1, message.getIntProperty("a").intValue());
-         Assert.assertFalse(message.containsProperty("b"));
-         Assert.assertEquals(3, message.getIntProperty("c").intValue());
-         Assert.assertEquals(4, message.getIntProperty("d").intValue());
-
-      }
-
-      interceptor3.setReject(true);
-
-      interceptor1.setWasCalled(false);
-      interceptor2.setWasCalled(false);
-      interceptor3.setWasCalled(false);
-      interceptor4.setWasCalled(false);
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         producer.send(message);
-      }
-
-      ClientMessage message = consumer.receiveImmediate();
-
-      Assert.assertNull(message);
-
-      Assert.assertTrue(interceptor1.wasCalled());
-      Assert.assertFalse(interceptor2.wasCalled());
-      Assert.assertTrue(interceptor3.wasCalled());
-      Assert.assertFalse(interceptor4.wasCalled());
-
-      session.close();
-   }
-
-   @Test
-   public void testClientMultipleInterceptors() throws Exception {
-      MyInterceptor6 interceptor1 = new MyInterceptor6("a", 1);
-      MyInterceptor6 interceptor2 = new MyInterceptor6("b", 2);
-      MyInterceptor6 interceptor3 = new MyInterceptor6("c", 3);
-      MyInterceptor6 interceptor4 = new MyInterceptor6("d", 4);
-
-      ClientSessionFactory sf = createSessionFactory(locator);
-
-      sf.getServerLocator().addIncomingInterceptor(interceptor1);
-      sf.getServerLocator().addIncomingInterceptor(interceptor2);
-      sf.getServerLocator().addIncomingInterceptor(interceptor3);
-      sf.getServerLocator().addIncomingInterceptor(interceptor4);
-
-      ClientSession session = sf.createSession(false, true, true, true);
-
-      session.createQueue(QUEUE, QUEUE, null, false);
-
-      ClientProducer producer = session.createProducer(QUEUE);
-
-      final int numMessages = 10;
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         producer.send(message);
-      }
-
-      ClientConsumer consumer = session.createConsumer(QUEUE);
-
-      session.start();
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = consumer.receive(1000);
-
-         Assert.assertEquals(1, message.getIntProperty("a").intValue());
-         Assert.assertEquals(2, message.getIntProperty("b").intValue());
-         Assert.assertEquals(3, message.getIntProperty("c").intValue());
-         Assert.assertEquals(4, message.getIntProperty("d").intValue());
-      }
-
-      sf.getServerLocator().removeIncomingInterceptor(interceptor2);
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         producer.send(message);
-      }
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = consumer.receive(1000);
-
-         Assert.assertEquals(1, message.getIntProperty("a").intValue());
-         Assert.assertFalse(message.containsProperty("b"));
-         Assert.assertEquals(3, message.getIntProperty("c").intValue());
-         Assert.assertEquals(4, message.getIntProperty("d").intValue());
-
-      }
-
-      interceptor3.setReject(true);
-
-      interceptor1.setWasCalled(false);
-      interceptor2.setWasCalled(false);
-      interceptor3.setWasCalled(false);
-      interceptor4.setWasCalled(false);
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         producer.send(message);
-      }
-
-      ClientMessage message = consumer.receive(100);
-
-      Assert.assertNull(message);
-
-      Assert.assertTrue(interceptor1.wasCalled());
-      Assert.assertFalse(interceptor2.wasCalled());
-      Assert.assertTrue(interceptor3.wasCalled());
-      Assert.assertFalse(interceptor4.wasCalled());
-
-      session.close();
-   }
-
-   @Test
-   public void testServerOutgoingInterceptorChangeProperty() throws Exception {
-      MyOutgoingInterceptor1 interceptor = new MyOutgoingInterceptor1();
-
-      server.getRemotingService().addOutgoingInterceptor(interceptor);
-
-      ClientSessionFactory sf = createSessionFactory(locator);
-
-      ClientSession session = sf.createSession(false, true, true, true);
-
-      session.createQueue(QUEUE, QUEUE, null, false);
-
-      ClientProducer producer = session.createProducer(QUEUE);
-
-      final int numMessages = 10;
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         message.putIntProperty("count", i);
-
-         message.putStringProperty(InterceptorTest.key, "apple");
-
-         producer.send(message);
-      }
-
-      ClientConsumer consumer = session.createConsumer(QUEUE);
-
-      session.start();
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = consumer.receive(1000);
-
-         assertNotNull(message);
-
-         assertEquals(i, message.getIntProperty("count").intValue());
-
-         Assert.assertEquals("orange", message.getStringProperty(InterceptorTest.key));
-      }
-
-      server.getRemotingService().removeOutgoingInterceptor(interceptor);
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         message.putStringProperty(InterceptorTest.key, "apple");
-
-         producer.send(message);
-      }
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = consumer.receive(1000);
-
-         Assert.assertEquals("apple", message.getStringProperty(InterceptorTest.key));
-      }
-
-      session.close();
-   }
-
-   @Test
-   public void testServerOutgoingInterceptorRejectMessage() throws Exception {
-      MyOutgoingInterceptor2 interceptor = new MyOutgoingInterceptor2();
-
-      server.getRemotingService().addOutgoingInterceptor(interceptor);
-
-      locator.setBlockOnNonDurableSend(false);
-
-      ClientSessionFactory sf = createSessionFactory(locator);
-
-      ClientSession session = sf.createSession(false, true, true, true);
-
-      session.createQueue(QUEUE, QUEUE, null, false);
-
-      ClientProducer producer = session.createProducer(QUEUE);
-
-      final int numMessages = 10;
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = session.createMessage(false);
-
-         producer.send(message);
-      }
-
-      ClientConsumer consumer = session.createConsumer(QUEUE);
-
-      session.start();
-
-      ClientMessage message = consumer.receiveImmediate();
-
-      Assert.assertNull(message);
-
-      session.close();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
index 7a20c9a..0d52d05 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
@@ -55,7 +55,7 @@ public class ClusterControllerTest extends ClusterTestBase {
    @Test
    public void controlWithDifferentConnector() throws Exception {
       try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) {
-         locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
+         locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
          ClusterController controller = new ClusterController(getServer(0), getServer(0).getScheduledPool());
          ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory());
          clusterControl.authorize();
@@ -65,7 +65,7 @@ public class ClusterControllerTest extends ClusterTestBase {
    @Test
    public void controlWithDifferentPassword() throws Exception {
       try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) {
-         locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
+         locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
          ClusterController controller = new ClusterController(getServer(1), getServer(1).getScheduledPool());
          ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory());
          try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
index ae81eed..d3ece00 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
@@ -1329,7 +1329,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
          locators[node] = ActiveMQClient.createServerLocatorWithoutHA(serverTotc);
       }
 
-      locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
+      locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locators[node]));
 
       locators[node].setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
       addServerLocator(locators[node]);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/Incoming.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/Incoming.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/Incoming.java
new file mode 100644
index 0000000..a517764
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/Incoming.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.interceptors;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+
+public class Incoming implements Interceptor {
+
+   public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
+
+      System.out.println("Incoming:Packet : " + packet);
+      if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG) {
+         SessionReceiveMessage p = (SessionReceiveMessage) packet;
+
+         p.getMessage().putStringProperty("Incoming", "was here");
+
+      }
+
+      return true;
+   }
+
+}


Mime
View raw message