activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [4/6] activemq-artemis git commit: ARTEMIS-238 and ARTEMIS-236 Fixing Legacy protocol support
Date Fri, 09 Oct 2015 02:55:55 GMT
ARTEMIS-238 and ARTEMIS-236 Fixing Legacy protocol support


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/206acdac
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/206acdac
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/206acdac

Branch: refs/heads/master
Commit: 206acdac7d5d2581c78337988949c0dca07cb30a
Parents: 1c067a5
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Wed Oct 7 22:37:34 2015 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Oct 8 20:32:43 2015 -0400

----------------------------------------------------------------------
 .../artemis/api/core/client/ServerLocator.java  |   16 +-
 .../core/client/impl/ServerLocatorImpl.java     |   83 +-
 .../core/client/impl/ServerLocatorInternal.java |    2 +-
 .../impl/ActiveMQClientProtocolManager.java     |   26 +-
 .../ActiveMQClientProtocolManagerFactory.java   |   21 +-
 .../core/impl/ActiveMQSessionContext.java       |   30 +-
 .../core/impl/wireformat/MessagePacket.java     |    2 +-
 .../core/impl/wireformat/MessagePacketI.java    |   24 +
 .../wireformat/SessionReceiveLargeMessage.java  |    8 +-
 .../wireformat/SessionSendLargeMessage.java     |    8 +-
 .../remoting/ClientProtocolManagerFactory.java  |    6 +
 .../jms/client/ActiveMQConnectionFactory.java   |   52 +
 .../artemis/uri/ConnectionFactoryURITest.java   |   13 +
 .../config/ConnectionFactoryConfiguration.java  |    6 +
 .../ConnectionFactoryConfigurationImpl.java     |   28 +-
 .../jms/server/impl/JMSServerManagerImpl.java   |    8 +-
 .../protocol/proton/ProtonProtocolManager.java  |    5 +
 .../HQPropertiesConversionInterceptor.java      |   68 +-
 .../hornetq/HornetQProtocolManager.java         |   12 +-
 .../hornetq/HornetQProtocolManagerFactory.java  |    5 +-
 .../client/HornetQClientProtocolManager.java    |   67 ++
 .../HornetQClientProtocolManagerFactory.java    |   46 +
 .../client/HornetQClientSessionContext.java     |  101 ++
 .../hornetq/util/HQPropertiesConverter.java     |   86 ++
 .../core/protocol/mqtt/MQTTProtocolManager.java |   10 +-
 .../openwire/OpenWireProtocolManager.java       |    5 +
 .../protocol/stomp/StompProtocolManager.java    |    5 +
 .../artemis/ra/ActiveMQResourceAdapter.java     |   20 +
 .../artemis/ra/ConnectionFactoryProperties.java |   17 +
 .../artemis/core/config/Configuration.java      |   11 +
 .../core/config/impl/ConfigurationImpl.java     |   30 +
 .../artemis/core/protocol/ProtocolHandler.java  |   10 +-
 .../protocol/core/impl/CoreProtocolManager.java |   23 +-
 ...ctiveMQServerSideProtocolManagerFactory.java |   20 +-
 .../core/server/cluster/BackupManager.java      |    4 +-
 .../core/server/cluster/ClusterController.java  |    4 +-
 .../cluster/impl/ClusterConnectionBridge.java   |    2 +-
 .../cluster/impl/ClusterConnectionImpl.java     |    4 +-
 .../impl/BackupRecoveryJournalLoader.java       |   10 +-
 .../core/server/impl/LiveOnlyActivation.java    |    2 +-
 .../spi/core/protocol/ProtocolManager.java      |    5 +
 .../artemis/tests/util/ActiveMQTestBase.java    |   16 +
 .../xa/recovery/ActiveMQXAResourceRecovery.java |    2 +-
 .../xa/recovery/XARecoveryConfig.java           |   46 +-
 .../tests/recovery/XARecoveryConfigTest.java    |   67 ++
 .../hornetq/HornetQProtocolManagerTest.java     |  138 +++
 .../protocols/hornetq/HornetQProtocolTest.java  |   55 +-
 .../tests/integration/InterceptorTest.java      | 1030 -----------------
 .../cluster/ClusterControllerTest.java          |    4 +-
 .../cluster/distribution/ClusterTestBase.java   |    2 +-
 .../integration/interceptors/Incoming.java      |   42 +
 .../interceptors/InterceptorTest.java           | 1089 ++++++++++++++++++
 .../integration/interceptors/Outgoing.java      |   41 +
 .../integration/jms/client/ConnectionTest.java  |   39 +-
 .../ra/ActiveMQResourceAdapterConfigTest.java   |    6 +
 .../ra/ConnectionFactoryPropertiesTest.java     |    2 +
 56 files changed, 2307 insertions(+), 1177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
index 8f086c6..b198878 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
@@ -769,6 +769,20 @@ public interface ServerLocator extends AutoCloseable {
 
    ClientProtocolManagerFactory getProtocolManagerFactory();
 
-   void setProtocolManagerFactory(ClientProtocolManagerFactory protocolManager);
+   ServerLocator setProtocolManagerFactory(ClientProtocolManagerFactory protocolManager);
+
+   /**
+    * @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method.
+    * @return this
+    */
+   ServerLocator setIncomingInterceptorList(String interceptorList);
+
+   String getIncomingInterceptorList();
+
+   ServerLocator setOutgoingInterceptorList(String interceptorList);
+
+   String getOutgoingInterceptorList();
+
+
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index c979246..3f1eead 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -80,7 +80,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
    private static final long serialVersionUID = -1615857864410205260L;
 
    // This is the default value
-   private ClientProtocolManagerFactory protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance();
+   private ClientProtocolManagerFactory protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance(this);
 
    private final boolean ha;
 
@@ -201,12 +201,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
 
    private TransportConfiguration clusterTransportConfiguration;
 
-   /*
-   * *************WARNING***************
-   * remember that when adding any new classes that we have to support serialization with previous clients.
-   * If you need to, make them transient and handle the serialization yourself
-   * */
-
    private final Exception traceException = new Exception();
 
    // To be called when there are ServerLocator being finalized.
@@ -619,14 +613,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
 
    public ClientProtocolManagerFactory getProtocolManagerFactory() {
       if (protocolManagerFactory == null) {
-         // this could happen over serialization from older versions
-         protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance();
+         // Default one in case it's null
+         protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance(this);
       }
       return protocolManagerFactory;
    }
 
-   public void setProtocolManagerFactory(ClientProtocolManagerFactory protocolManagerFactory) {
+   public ServerLocator setProtocolManagerFactory(ClientProtocolManagerFactory protocolManagerFactory) {
       this.protocolManagerFactory = protocolManagerFactory;
+      protocolManagerFactory.setLocator(this);
+      return this;
    }
 
    public void disableFinalizeCheck() {
@@ -860,10 +856,41 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return factory;
    }
 
+   @Override
    public boolean isHA() {
       return ha;
    }
 
+   /**
+    * @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method.
+    * @return this
+    */
+   @Override
+   public ServerLocator setIncomingInterceptorList(String interceptorList) {
+      feedInterceptors(incomingInterceptors, interceptorList);
+      return this;
+   }
+
+   @Override
+   public String getIncomingInterceptorList() {
+      return fromInterceptors(incomingInterceptors);
+   }
+
+   /**
+    * @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method.
+    * @return this
+    */
+   @Override
+   public ServerLocator setOutgoingInterceptorList(String interceptorList) {
+      feedInterceptors(outgoingInterceptors, interceptorList);
+      return this;
+   }
+
+   @Override
+   public String getOutgoingInterceptorList() {
+      return fromInterceptors(outgoingInterceptors);
+   }
+
    public boolean isCacheLargeMessagesClient() {
       return cacheLargeMessagesClient;
    }
@@ -1775,4 +1802,40 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
    public boolean isReceivedToplogy() {
       return receivedTopology;
    }
+
+   private String fromInterceptors(final List<Interceptor> interceptors) {
+      StringBuffer buffer = new StringBuffer();
+      boolean first = true;
+      for (Interceptor value : interceptors) {
+         if (!first) {
+            buffer.append(",");
+         }
+         first = false;
+         buffer.append(value.getClass().getName());
+      }
+
+      return buffer.toString();
+   }
+
+   private void feedInterceptors(final List<Interceptor> interceptors,  final String interceptorList) {
+      interceptors.clear();
+
+      if (interceptorList == null || interceptorList.trim().equals("")) {
+         return;
+      }
+      AccessController.doPrivileged(new PrivilegedAction<Object>() {
+         public Object run() {
+
+            String[] arrayInterceptor = interceptorList.split(",");
+            for (String strValue : arrayInterceptor) {
+               Interceptor interceptor = (Interceptor) ClassloadingUtil.newInstanceFromClassLoader(strValue.trim());
+               interceptors.add(interceptor);
+            }
+            return null;
+         }
+      });
+
+   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java
index 42da789..0f945aa 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java
@@ -19,10 +19,10 @@ package org.apache.activemq.artemis.core.client.impl;
 import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
-import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
 
 public interface ServerLocatorInternal extends ServerLocator {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
index 73ea529..b45fd5a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
@@ -276,7 +276,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
 
             long sessionChannelID = connection.generateChannelID();
 
-            Packet request = new CreateSessionMessage(name, sessionChannelID, clientVersion.getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null);
+            Packet request = newCreateSessionPacket(clientVersion, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, minLargeMessageSize, confirmationWindowSize, sessionChannelID);
 
             try {
                // channel1 reference here has to go away
@@ -325,10 +325,30 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
             inCreateSessionLatch.countDown();
          }
       } while (retry);
+      return newSessionContext(name, confirmationWindowSize, sessionChannel, response);
 
+   }
+
+   protected Packet newCreateSessionPacket(Version clientVersion,
+                                           String name,
+                                           String username,
+                                           String password,
+                                           boolean xa,
+                                           boolean autoCommitSends,
+                                           boolean autoCommitAcks,
+                                           boolean preAcknowledge,
+                                           int minLargeMessageSize,
+                                           int confirmationWindowSize,
+                                           long sessionChannelID) {
+      return new CreateSessionMessage(name, sessionChannelID, clientVersion.getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null);
+   }
+
+   protected SessionContext newSessionContext(String name,
+                                            int confirmationWindowSize,
+                                            Channel sessionChannel,
+                                            CreateSessionResponseMessage response) {
       // these objects won't be null, otherwise it would keep retrying on the previous loop
       return new ActiveMQSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize);
-
    }
 
    public boolean cleanupBeforeFailover(ActiveMQException cause) {
@@ -398,7 +418,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
       return connection;
    }
 
-   private void sendHandshake(Connection transportConnection) {
+   protected void sendHandshake(Connection transportConnection) {
       if (transportConnection.isUsingProtocolHandling()) {
          // no need to send handshake on inVM as inVM is not using the NettyProtocolHandling
          ActiveMQBuffer amqbuffer = connection.createTransportBuffer(handshake.length());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java
index a58834b..24727c9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl;
 
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
 import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
 
@@ -23,13 +24,25 @@ public class ActiveMQClientProtocolManagerFactory implements ClientProtocolManag
 
    private static final long serialVersionUID = 1;
 
-   private static final ActiveMQClientProtocolManagerFactory INSTANCE = new ActiveMQClientProtocolManagerFactory();
-
    private ActiveMQClientProtocolManagerFactory() {
    }
 
-   public static final ActiveMQClientProtocolManagerFactory getInstance() {
-      return INSTANCE;
+   ServerLocator locator;
+
+   @Override
+   public ServerLocator getLocator() {
+      return locator;
+   }
+
+   @Override
+   public void setLocator(ServerLocator locator) {
+      this.locator = locator;
+   }
+
+   public static final ActiveMQClientProtocolManagerFactory getInstance(ServerLocator locator) {
+      ActiveMQClientProtocolManagerFactory factory = new ActiveMQClientProtocolManagerFactory();
+      factory.setLocator(locator);
+      return factory;
    }
 
    public ClientProtocolManager newProtocolManager() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 5279de2..d8dc125 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -114,6 +114,19 @@ public class ActiveMQSessionContext extends SessionContext {
    private int confirmationWindow;
    private final String name;
 
+   protected Channel getSessionChannel() {
+      return sessionChannel;
+   }
+
+   protected String getName() {
+      return name;
+   }
+
+   protected int getConfirmationWindow() {
+      return confirmationWindow;
+
+   }
+
    public ActiveMQSessionContext(String name,
                                  RemotingConnection remotingConnection,
                                  Channel sessionChannel,
@@ -536,7 +549,7 @@ public class ActiveMQSessionContext extends SessionContext {
                                final boolean autoCommitAcks,
                                final boolean preAcknowledge,
                                final SimpleString defaultAddress) throws ActiveMQException {
-      Packet createRequest = new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, defaultAddress == null ? null : defaultAddress.toString());
+      Packet createRequest = newCreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, defaultAddress);
       boolean retry;
       do {
          try {
@@ -564,6 +577,17 @@ public class ActiveMQSessionContext extends SessionContext {
       } while (retry && !session.isClosing());
    }
 
+   protected CreateSessionMessage newCreateSession(String username,
+                                                   String password,
+                                                   int minLargeMessageSize,
+                                                   boolean xa,
+                                                   boolean autoCommitSends,
+                                                   boolean autoCommitAcks,
+                                                   boolean preAcknowledge,
+                                                   SimpleString defaultAddress) {
+      return new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, defaultAddress == null ? null : defaultAddress.toString());
+   }
+
    @Override
    public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException {
       ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo();
@@ -724,7 +748,7 @@ public class ActiveMQSessionContext extends SessionContext {
       return ((ActiveMQConsumerContext) consumer.getConsumerContext()).getId();
    }
 
-   private ClassLoader lookupTCCL() {
+   protected ClassLoader lookupTCCL() {
       return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
          public ClassLoader run() {
             return Thread.currentThread().getContextClassLoader();
@@ -733,7 +757,7 @@ public class ActiveMQSessionContext extends SessionContext {
 
    }
 
-   private int calcWindowSize(final int windowSize) {
+   protected int calcWindowSize(final int windowSize) {
       int clientWindowSize;
       if (windowSize == -1) {
          // No flow control - buffer can increase without bound! Only use with

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
index abcb233..4ed86ba 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
@@ -20,7 +20,7 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 
-public abstract class MessagePacket extends PacketImpl {
+public abstract class MessagePacket extends PacketImpl implements MessagePacketI {
 
    protected MessageInternal message;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacketI.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacketI.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacketI.java
new file mode 100644
index 0000000..ea1146f
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacketI.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.Message;
+
+public interface MessagePacketI {
+   Message getMessage();
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java
index 460cd23..8b32256 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java
@@ -17,10 +17,11 @@
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 
-public class SessionReceiveLargeMessage extends PacketImpl {
+public class SessionReceiveLargeMessage extends PacketImpl implements MessagePacketI {
 
    private final MessageInternal message;
 
@@ -58,6 +59,11 @@ public class SessionReceiveLargeMessage extends PacketImpl {
       return message;
    }
 
+   @Override
+   public Message getMessage() {
+      return message;
+   }
+
    public long getConsumerID() {
       return consumerID;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java
index 1bf9bbb..3c7dbe7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java
@@ -17,10 +17,11 @@
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 
-public class SessionSendLargeMessage extends PacketImpl {
+public class SessionSendLargeMessage extends PacketImpl implements MessagePacketI {
 
    /**
     * Used only if largeMessage
@@ -44,6 +45,11 @@ public class SessionSendLargeMessage extends PacketImpl {
    }
 
    @Override
+   public Message getMessage() {
+      return largeMessage;
+   }
+
+   @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       largeMessage.encodeHeadersAndProperties(buffer);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManagerFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManagerFactory.java
index c9c78a5..7e82238 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManagerFactory.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManagerFactory.java
@@ -16,7 +16,13 @@
  */
 package org.apache.activemq.artemis.spi.core.remoting;
 
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+
 public interface ClientProtocolManagerFactory {
 
    ClientProtocolManager newProtocolManager();
+
+   void setLocator(ServerLocator locator);
+
+   ServerLocator getLocator();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java
index 912554e..aa29bc5 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java
@@ -39,6 +39,8 @@ import java.io.InvalidObjectException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.net.URI;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 
 import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -50,8 +52,10 @@ import org.apache.activemq.artemis.api.jms.JMSFactoryType;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
 import org.apache.activemq.artemis.jms.referenceable.ConnectionFactoryObjectFactory;
 import org.apache.activemq.artemis.jms.referenceable.SerializableObjectRefAddr;
+import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
 import org.apache.activemq.artemis.uri.ConnectionFactoryParser;
 import org.apache.activemq.artemis.uri.ServerLocatorParser;
+import org.apache.activemq.artemis.utils.ClassloadingUtil;
 
 /**
  * <p>ActiveMQ Artemis implementation of a JMS ConnectionFactory.</p>
@@ -73,6 +77,8 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
 
    private String password;
 
+   private String protocolManagerFactoryStr;
+
    public void writeExternal(ObjectOutput out) throws IOException {
       URI uri = toURI();
 
@@ -121,6 +127,27 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
       return uri;
    }
 
+   public String getProtocolManagerFactoryStr() {
+      return protocolManagerFactoryStr;
+   }
+
+   public void setProtocolManagerFactoryStr(final String protocolManagerFactoryStr) {
+
+      if (protocolManagerFactoryStr != null && !protocolManagerFactoryStr.trim().isEmpty()) {
+         AccessController.doPrivileged(new PrivilegedAction<Object>() {
+            public Object run() {
+
+               ClientProtocolManagerFactory protocolManagerFactory =
+                  (ClientProtocolManagerFactory) ClassloadingUtil.newInstanceFromClassLoader(protocolManagerFactoryStr);
+               serverLocator.setProtocolManagerFactory(protocolManagerFactory);
+               return null;
+            }
+         });
+
+         this.protocolManagerFactoryStr = protocolManagerFactoryStr;
+      }
+   }
+
    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
       String url = in.readUTF();
       ConnectionFactoryParser parser = new ConnectionFactoryParser();
@@ -606,6 +633,31 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
       serverLocator.setInitialMessagePacketSize(size);
    }
 
+   /**
+    * @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method.
+    * @return this
+    */
+   public void setIncomingInterceptorList(String interceptorList) {
+      checkWrite();
+      serverLocator.setIncomingInterceptorList(interceptorList);
+   }
+
+   public String getIncomingInterceptorList() {
+      return serverLocator.getIncomingInterceptorList();
+   }
+
+   /**
+    * @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method.
+    * @return this
+    */
+   public void setOutgoingInterceptorList(String interceptorList) {
+      serverLocator.setOutgoingInterceptorList(interceptorList);
+   }
+
+   public String getOutgoingInterceptorList() {
+      return serverLocator.getOutgoingInterceptorList();
+   }
+
    public ActiveMQConnectionFactory setUser(String user) {
       checkWrite();
       this.user = user;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java b/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java
index d374756..4863208 100644
--- a/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java
+++ b/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java
@@ -26,6 +26,7 @@ import java.io.ObjectOutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -56,6 +57,14 @@ public class ConnectionFactoryURITest {
 
    private static final String IPV6 = "fe80::baf6:b1ff:fe12:daf7%eth0";
 
+   private static Set<String> ignoreList = new HashSet<String>();
+
+   static {
+      ignoreList.add("protocolManagerFactoryStr");
+      ignoreList.add("incomingInterceptorList");
+      ignoreList.add("outgoingInterceptorList");
+   }
+
    @Test
    public void testIPv6() throws Exception {
       Map<String,Object> params = new HashMap<>();
@@ -379,6 +388,10 @@ public class ConnectionFactoryURITest {
                          ActiveMQConnectionFactory factory) throws IllegalAccessException, InvocationTargetException {
       PropertyDescriptor[] descriptors = bean.getPropertyUtils().getPropertyDescriptors(factory);
       for (PropertyDescriptor descriptor : descriptors) {
+         if (ignoreList.contains(descriptor.getName())) {
+            continue;
+         }
+         System.err.println("name::" + descriptor.getName());
          if (descriptor.getWriteMethod() != null && descriptor.getReadMethod() != null) {
             if (descriptor.getPropertyType() == String.class) {
                String value = RandomUtil.randomString();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java
index 57a955f..ab4990c 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java
@@ -44,6 +44,8 @@ public interface ConnectionFactoryConfiguration extends EncodingSupport {
 
    ConnectionFactoryConfiguration setConnectorNames(List<String> connectorNames);
 
+   ConnectionFactoryConfiguration setConnectorNames(String...connectorNames);
+
    boolean isHA();
 
    ConnectionFactoryConfiguration setHA(boolean ha);
@@ -170,5 +172,9 @@ public interface ConnectionFactoryConfiguration extends EncodingSupport {
 
    ConnectionFactoryConfiguration setFactoryType(JMSFactoryType factType);
 
+   ConnectionFactoryConfiguration setProtocolManagerFactoryStr(String protocolManagerFactoryStr);
+
+   String getProtocolManagerFactoryStr();
+
    JMSFactoryType getFactoryType();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
index 43c5385..b5efcd7 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.jms.server.config.impl;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -113,6 +114,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
 
    private String groupID = null;
 
+   private String protocolManagerFactoryStr;
+
    private JMSFactoryType factoryType = JMSFactoryType.CF;
 
    // Static --------------------------------------------------------
@@ -170,6 +173,11 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
       return this;
    }
 
+
+   public ConnectionFactoryConfiguration setConnectorNames(final String...names) {
+      return this.setConnectorNames(Arrays.asList(names));
+   }
+
    public boolean isHA() {
       return ha;
    }
@@ -534,6 +542,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
       groupID = BufferHelper.readNullableSimpleStringAsString(buffer);
 
       factoryType = JMSFactoryType.valueOf(buffer.readInt());
+
+      protocolManagerFactoryStr = BufferHelper.readNullableSimpleStringAsString(buffer);
    }
 
    @Override
@@ -618,6 +628,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
       BufferHelper.writeAsNullableSimpleString(buffer, groupID);
 
       buffer.writeInt(factoryType.intValue());
+
+      BufferHelper.writeAsNullableSimpleString(buffer, protocolManagerFactoryStr);
    }
 
    @Override
@@ -724,7 +736,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
 
          BufferHelper.sizeOfNullableSimpleString(groupID) +
 
-         DataConstants.SIZE_INT; // factoryType
+         DataConstants.SIZE_INT +
+          // factoryType
+
+         BufferHelper.sizeOfNullableSimpleString(protocolManagerFactoryStr);
 
       return size;
    }
@@ -749,6 +764,17 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
       return this.compressLargeMessage;
    }
 
+   @Override
+   public ConnectionFactoryConfiguration setProtocolManagerFactoryStr(String protocolManagerFactoryStr) {
+      this.protocolManagerFactoryStr = protocolManagerFactoryStr;
+      return this;
+   }
+
+   @Override
+   public String getProtocolManagerFactoryStr() {
+      return protocolManagerFactoryStr;
+   }
+
    // Public --------------------------------------------------------
 
    // Package protected ---------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
index ff246f2..99e0daa 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
@@ -955,7 +955,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
          public void runException() throws Exception {
             checkBindings(bindings);
 
-            ActiveMQConnectionFactory cf = internalCreateCF(storeConfig, cfConfig);
+            ActiveMQConnectionFactory cf = internalCreateCF(cfConfig);
 
             ArrayList<String> bindingsToAdd = new ArrayList<String>();
 
@@ -1075,8 +1075,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
     * @param cfConfig
     * @throws Exception
     */
-   private ActiveMQConnectionFactory internalCreateCF(final boolean persisted,
-                                                      final ConnectionFactoryConfiguration cfConfig) throws Exception {
+   private ActiveMQConnectionFactory internalCreateCF(final ConnectionFactoryConfiguration cfConfig) throws Exception {
       checkInitialised();
 
       ActiveMQConnectionFactory cf = connectionFactories.get(cfConfig.getName());
@@ -1168,6 +1167,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
       cf.setFailoverOnInitialConnection(cfConfig.isFailoverOnInitialConnection());
       cf.setCompressLargeMessage(cfConfig.isCompressLargeMessages());
       cf.setGroupID(cfConfig.getGroupID());
+      cf.setProtocolManagerFactoryStr(cfConfig.getProtocolManagerFactoryStr());
       return cf;
    }
 
@@ -1445,7 +1445,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
       List<PersistedConnectionFactory> cfs = storage.recoverConnectionFactories();
 
       for (PersistedConnectionFactory cf : cfs) {
-         internalCreateCF(true, cf.getConfig());
+         internalCreateCF(cf.getConfig());
       }
 
       List<PersistedDestination> destinations = storage.recoverDestinations();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
index 3637289..9ac41ef 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
@@ -86,6 +86,11 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
    }
 
    @Override
+   public boolean acceptsNoHandshake() {
+      return false;
+   }
+
+   @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) {
       ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection);
       long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java
index cb47e85..012727f 100644
--- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java
+++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java
@@ -19,76 +19,36 @@ package org.apache.activemq.artemis.core.protocol.hornetq;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Interceptor;
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
-import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacket;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacketI;
+import org.apache.activemq.artemis.core.protocol.hornetq.util.HQPropertiesConverter;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-
 public class HQPropertiesConversionInterceptor implements Interceptor {
 
-   private static Map<SimpleString, SimpleString> dictionary;
-
-   static {
-      Map<SimpleString, SimpleString> d = new HashMap<SimpleString, SimpleString>();
-
-      // Add entries for outgoing messages
-      d.put(new SimpleString("_HQ_ACTUAL_EXPIRY"), new SimpleString("_AMQ_ACTUAL_EXPIRY"));
-      d.put(new SimpleString("_HQ_ORIG_ADDRESS"), new SimpleString("_AMQ_ORIG_ADDRESS"));
-      d.put(new SimpleString("_HQ_ORIG_QUEUE"), new SimpleString("_AMQ_ORIG_QUEUE"));
-      d.put(new SimpleString("_HQ_ORIG_MESSAGE_ID"), new SimpleString("_AMQ_ORIG_MESSAGE_ID"));
-      d.put(new SimpleString("_HQ_GROUP_ID"), new SimpleString("_AMQ_GROUP_ID"));
-      d.put(new SimpleString("_HQ_LARGE_COMPRESSED"), new SimpleString("_AMQ_LARGE_COMPRESSED"));
-      d.put(new SimpleString("_HQ_LARGE_SIZE"), new SimpleString("_AMQ_LARGE_SIZE"));
-      d.put(new SimpleString("_HQ_SCHED_DELIVERY"), new SimpleString("_AMQ_SCHED_DELIVERY"));
-      d.put(new SimpleString("_HQ_DUPL_ID"), new SimpleString("_AMQ_DUPL_ID"));
-      d.put(new SimpleString("_HQ_LVQ_NAME"), new SimpleString("_AMQ_LVQ_NAME"));
 
-      // Add entries for incoming messages
-      d.put(new SimpleString("_AMQ_ACTUAL_EXPIRY"), new SimpleString("_HQ_ACTUAL_EXPIRY"));
-      d.put(new SimpleString("_AMQ_ORIG_ADDRESS"), new SimpleString("_HQ_ORIG_ADDRESS"));
-      d.put(new SimpleString("_AMQ_ORIG_QUEUE"), new SimpleString("_HQ_ORIG_QUEUE"));
-      d.put(new SimpleString("_AMQ_ORIG_MESSAGE_ID"), new SimpleString("_HQ_ORIG_MESSAGE_ID"));
-      d.put(new SimpleString("_AMQ_GROUP_ID"), new SimpleString("_HQ_GROUP_ID"));
-      d.put(new SimpleString("_AMQ_LARGE_COMPRESSED"), new SimpleString("_HQ_LARGE_COMPRESSED"));
-      d.put(new SimpleString("_AMQ_LARGE_SIZE"), new SimpleString("_HQ_LARGE_SIZE"));
-      d.put(new SimpleString("_AMQ_SCHED_DELIVERY"), new SimpleString("_HQ_SCHED_DELIVERY"));
-      d.put(new SimpleString("_AMQ_DUPL_ID"), new SimpleString("_HQ_DUPL_ID"));
-      d.put(new SimpleString("_AMQ_LVQ_NAME"), new SimpleString("_HQ_LVQ_NAME"));
+   private final boolean replaceHQ;
 
-      dictionary = Collections.unmodifiableMap(d);
+   public HQPropertiesConversionInterceptor(final boolean replaceHQ) {
+      this.replaceHQ = replaceHQ;
    }
 
    @Override
    public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
-      if (isMessagePacket(packet)) {
-         handleReceiveMessage((MessagePacket) packet);
+
+      if (HQPropertiesConverter.isMessagePacket(packet)) {
+         handleReceiveMessage((MessagePacketI) packet);
       }
       return true;
    }
 
-   private void handleReceiveMessage(MessagePacket messagePacket) {
-      Message message = messagePacket.getMessage();
-      // We are modifying the key set so we iterate over a shallow copy.
-      for (SimpleString property : new HashSet<>(message.getPropertyNames())) {
-         if (dictionary.containsKey(property)) {
-            message.putObjectProperty(dictionary.get(property), message.removeProperty(property));
-         }
+   private void handleReceiveMessage(MessagePacketI messagePacket) {
+      if (replaceHQ) {
+         HQPropertiesConverter.replaceHQProperties(messagePacket.getMessage());
+      }
+      else {
+         HQPropertiesConverter.replaceAMQProperties(messagePacket.getMessage());
       }
    }
 
-   private boolean isMessagePacket(Packet packet) {
-      int type = packet.getType();
-      return type == PacketImpl.SESS_SEND ||
-         type == PacketImpl.SESS_SEND_CONTINUATION ||
-         type == PacketImpl.SESS_SEND_LARGE ||
-         type == PacketImpl.SESS_RECEIVE_LARGE_MSG ||
-         type == PacketImpl.SESS_RECEIVE_MSG;
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
index 3d6dab5..bd4274a 100644
--- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
+++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.artemis.core.protocol.hornetq;
 
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager;
@@ -23,9 +26,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFa
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
 /**
  * HornetQ Protocol Manager
  */
@@ -54,6 +54,12 @@ class HornetQProtocolManager extends CoreProtocolManager {
    }
 
    @Override
+   public boolean acceptsNoHandshake() {
+      return true;
+   }
+
+
+   @Override
    public boolean isProtocol(byte[] array) {
       String frameStart = new String(array, StandardCharsets.US_ASCII);
       return frameStart.startsWith("HORNETQ");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java
index a163459..33c9a78 100644
--- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java
+++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java
@@ -34,9 +34,8 @@ public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory {
    public ProtocolManager createProtocolManager(final ActiveMQServer server,
                                                 final List<Interceptor> incomingInterceptors,
                                                 List<Interceptor> outgoingInterceptors) {
-      Interceptor propertyConversionInterceptor = new HQPropertiesConversionInterceptor();
-      incomingInterceptors.add(propertyConversionInterceptor);
-      outgoingInterceptors.add(propertyConversionInterceptor);
+      incomingInterceptors.add(new HQPropertiesConversionInterceptor(true));
+      outgoingInterceptors.add(new HQPropertiesConversionInterceptor(false));
       return new HornetQProtocolManager(this, server, incomingInterceptors, outgoingInterceptors);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java
new file mode 100644
index 0000000..a1d9a60
--- /dev/null
+++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.hornetq.client;
+
+import org.apache.activemq.artemis.core.protocol.core.Channel;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
+import org.apache.activemq.artemis.core.version.Version;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
+
+public class HornetQClientProtocolManager extends ActiveMQClientProtocolManager {
+
+   private static final int VERSION_PLAYED = 123;
+   protected void sendHandshake(Connection transportConnection) {
+   }
+
+
+   protected SessionContext newSessionContext(String name,
+                                              int confirmationWindowSize,
+                                              Channel sessionChannel,
+                                              CreateSessionResponseMessage response) {
+      // these objects won't be null, otherwise it would keep retrying on the previous loop
+      return new HornetQClientSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize);
+   }
+
+   @Override
+   protected Packet newCreateSessionPacket(Version clientVersion,
+                                           String name,
+                                           String username,
+                                           String password,
+                                           boolean xa,
+                                           boolean autoCommitSends,
+                                           boolean autoCommitAcks,
+                                           boolean preAcknowledge,
+                                           int minLargeMessageSize,
+                                           int confirmationWindowSize,
+                                           long sessionChannelID) {
+      return new CreateSessionMessage(name, sessionChannelID, VERSION_PLAYED, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null);
+   }
+
+   @Override
+   public void sendSubscribeTopology(final boolean isServer) {
+      getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, VERSION_PLAYED));
+   }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java
new file mode 100644
index 0000000..ed57cfe
--- /dev/null
+++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.hornetq.client;
+
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.protocol.hornetq.HQPropertiesConversionInterceptor;
+import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
+import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
+
+public class HornetQClientProtocolManagerFactory implements ClientProtocolManagerFactory {
+
+
+   ServerLocator locator;
+
+   @Override
+   public ServerLocator getLocator() {
+      return locator;
+   }
+
+   @Override
+   public void setLocator(ServerLocator locator) {
+      this.locator = locator;
+      locator.addIncomingInterceptor(new HQPropertiesConversionInterceptor(true));
+      locator.addOutgoingInterceptor(new HQPropertiesConversionInterceptor(false));
+   }
+
+   @Override
+   public ClientProtocolManager newProtocolManager() {
+      return new HornetQClientProtocolManager();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
new file mode 100644
index 0000000..169a82a
--- /dev/null
+++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.hornetq.client;
+
+import java.util.concurrent.Executor;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.core.client.impl.AddressQueryImpl;
+import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
+import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
+import org.apache.activemq.artemis.core.protocol.core.Channel;
+import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext;
+import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
+
+public class HornetQClientSessionContext extends ActiveMQSessionContext {
+
+   public HornetQClientSessionContext(String name,
+                                      RemotingConnection remotingConnection,
+                                      Channel sessionChannel,
+                                      int serverVersion,
+                                      int confirmationWindow) {
+      super(name, remotingConnection, sessionChannel, serverVersion, confirmationWindow);
+   }
+
+
+   public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException {
+      SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
+      SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP);
+
+      return response.toQueueQuery();
+   }
+
+   protected CreateSessionMessage newCreateSession(String username,
+                                                   String password,
+                                                   int minLargeMessageSize,
+                                                   boolean xa,
+                                                   boolean autoCommitSends,
+                                                   boolean autoCommitAcks,
+                                                   boolean preAcknowledge,
+                                                   SimpleString defaultAddress) {
+      return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), defaultAddress == null ? null : defaultAddress.toString());
+   }
+
+
+   public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException {
+      SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) getSessionChannel().sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP);
+
+      return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false);
+   }
+
+   public ClientConsumerInternal createConsumer(SimpleString queueName,
+                                                SimpleString filterString,
+                                                int windowSize,
+                                                int maxRate,
+                                                int ackBatchSize,
+                                                boolean browseOnly,
+                                                Executor executor,
+                                                Executor flowControlExecutor) throws ActiveMQException {
+      long consumerID = idGenerator.generateID();
+
+      ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID);
+
+      SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true);
+
+      SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP);
+
+      // The actual windows size that gets used is determined by the user since
+      // could be overridden on the queue settings
+      // The value we send is just a hint
+
+      return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java
new file mode 100644
index 0000000..9240e55
--- /dev/null
+++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.hornetq.util;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+
+public class HQPropertiesConverter {
+
+   private static Map<SimpleString, SimpleString> hqAmqDictionary;
+   private static Map<SimpleString, SimpleString> amqHqDictionary;
+
+   static {
+      Map<SimpleString, SimpleString> d = new HashMap<SimpleString, SimpleString>();
+
+      // Add entries for outgoing messages
+      d.put(new SimpleString("_HQ_ACTUAL_EXPIRY"), new SimpleString("_AMQ_ACTUAL_EXPIRY"));
+      d.put(new SimpleString("_HQ_ORIG_ADDRESS"), new SimpleString("_AMQ_ORIG_ADDRESS"));
+      d.put(new SimpleString("_HQ_ORIG_QUEUE"), new SimpleString("_AMQ_ORIG_QUEUE"));
+      d.put(new SimpleString("_HQ_ORIG_MESSAGE_ID"), new SimpleString("_AMQ_ORIG_MESSAGE_ID"));
+      d.put(new SimpleString("_HQ_GROUP_ID"), new SimpleString("_AMQ_GROUP_ID"));
+      d.put(new SimpleString("_HQ_LARGE_COMPRESSED"), new SimpleString("_AMQ_LARGE_COMPRESSED"));
+      d.put(new SimpleString("_HQ_LARGE_SIZE"), new SimpleString("_AMQ_LARGE_SIZE"));
+      d.put(new SimpleString("_HQ_SCHED_DELIVERY"), new SimpleString("_AMQ_SCHED_DELIVERY"));
+      d.put(new SimpleString("_HQ_DUPL_ID"), new SimpleString("_AMQ_DUPL_ID"));
+      d.put(new SimpleString("_HQ_LVQ_NAME"), new SimpleString("_AMQ_LVQ_NAME"));
+
+      hqAmqDictionary = Collections.unmodifiableMap(d);
+
+      d = new HashMap<>();
+      // inverting the direction
+      for (Map.Entry<SimpleString, SimpleString> entry: hqAmqDictionary.entrySet()) {
+         d.put(entry.getValue(), entry.getKey());
+      }
+
+      amqHqDictionary = Collections.unmodifiableMap(d);
+   }
+
+   public static void replaceAMQProperties(final Message message) {
+      replaceDict(message, amqHqDictionary);
+   }
+
+   public static void replaceHQProperties(final Message message) {
+      replaceDict(message, hqAmqDictionary);
+   }
+
+   private static void replaceDict(final Message message, Map<SimpleString, SimpleString> dictionary) {
+      for (SimpleString property : new HashSet<>(message.getPropertyNames())) {
+         SimpleString replaceTo = dictionary.get(property);
+         if (replaceTo != null) {
+            message.putObjectProperty(replaceTo, message.removeProperty(property));
+         }
+      }
+   }
+
+   public static boolean isMessagePacket(Packet packet) {
+      int type = packet.getType();
+      return type == PacketImpl.SESS_SEND ||
+         type == PacketImpl.SESS_SEND_LARGE ||
+         type == PacketImpl.SESS_RECEIVE_LARGE_MSG ||
+         type == PacketImpl.SESS_RECEIVE_MSG;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index ce75e4d..e99272f 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
+import java.util.List;
+
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.mqtt.MqttDecoder;
 import io.netty.handler.codec.mqtt.MqttEncoder;
@@ -32,8 +34,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 
-import java.util.List;
-
 /**
  * MQTTProtocolManager
  */
@@ -80,6 +80,12 @@ class MQTTProtocolManager implements ProtocolManager, NotificationListener {
    }
 
    @Override
+   public boolean acceptsNoHandshake() {
+      return false;
+   }
+
+
+   @Override
    public void removeHandler(String name) {
       // TODO add support for handlers
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 04be7aa..9b35b90 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -150,6 +150,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
    }
 
+   @Override
+   public boolean acceptsNoHandshake() {
+      return false;
+   }
+
    public ProtocolManagerFactory<Interceptor> getFactory() {
       return factory;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 98d21e4..6dcd351 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -105,6 +105,11 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
    }
 
    @Override
+   public boolean acceptsNoHandshake() {
+      return false;
+   }
+
+   @Override
    public ProtocolManagerFactory<StompFrameInterceptor> getFactory() {
       return factory;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
index b4370c9..2a40e01 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
@@ -879,6 +879,22 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
       raProperties.setProducerWindowSize(producerWindowSize);
    }
 
+   public String getProtocolManagerFactoryStr() {
+      if (ActiveMQResourceAdapter.trace) {
+         ActiveMQRALogger.LOGGER.trace("getProtocolManagerFactoryStr()");
+      }
+
+      return raProperties.getProtocolManagerFactoryStr();
+   }
+
+   public void setProtocolManagerFactoryStr(final String protocolManagerFactoryStr) {
+      if (ActiveMQResourceAdapter.trace) {
+         ActiveMQRALogger.LOGGER.trace("setProtocolManagerFactoryStr(" + protocolManagerFactoryStr + ")");
+      }
+
+      raProperties.setProtocolManagerFactoryStr(protocolManagerFactoryStr);
+   }
+
    /**
     * Get min large message size
     *
@@ -1971,6 +1987,10 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
       if (val5 != null) {
          cf.setConnectionLoadBalancingPolicyClassName(val5);
       }
+      val5 = overrideProperties.getProtocolManagerFactoryStr() != null ? overrideProperties.getProtocolManagerFactoryStr() : raProperties.getProtocolManagerFactoryStr();
+      if (val5 != null) {
+         cf.setProtocolManagerFactoryStr(val5);
+      }
    }
 
    public void setManagedConnectionFactory(ActiveMQRAManagedConnectionFactory activeMQRAManagedConnectionFactory) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java
index 9edd1d8..2137186 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java
@@ -118,6 +118,8 @@ public class ConnectionFactoryProperties {
 
    private String groupID;
 
+   private String protocolManagerFactoryStr;
+
    /**
     * @return the transportType
     */
@@ -679,6 +681,14 @@ public class ConnectionFactoryProperties {
       hasBeenUpdated = true;
    }
 
+   public String getProtocolManagerFactoryStr() {
+      return protocolManagerFactoryStr;
+   }
+
+   public void setProtocolManagerFactoryStr(String protocolManagerFactoryStr) {
+      this.protocolManagerFactoryStr = protocolManagerFactoryStr;
+   }
+
    public boolean isHasBeenUpdated() {
       return hasBeenUpdated;
    }
@@ -890,6 +900,12 @@ public class ConnectionFactoryProperties {
       }
       else if (!this.producerWindowSize.equals(other.producerWindowSize))
          return false;
+      else if (!protocolManagerFactoryStr.equals(other.protocolManagerFactoryStr))
+         return false;
+      if (this.protocolManagerFactoryStr == null) {
+         if (other.protocolManagerFactoryStr != null)
+            return false;
+      }
       if (this.reconnectAttempts == null) {
          if (other.reconnectAttempts != null)
             return false;
@@ -971,6 +987,7 @@ public class ConnectionFactoryProperties {
       result = prime * result + ((compressLargeMessage == null) ? 0 : compressLargeMessage.hashCode());
       result = prime * result + ((consumerWindowSize == null) ? 0 : consumerWindowSize.hashCode());
       result = prime * result + ((producerWindowSize == null) ? 0 : producerWindowSize.hashCode());
+      result = prime * result + ((protocolManagerFactoryStr == null) ? 0 : protocolManagerFactoryStr.hashCode());
       result = prime * result + ((consumerMaxRate == null) ? 0 : consumerMaxRate.hashCode());
       result = prime * result + ((confirmationWindowSize == null) ? 0 : confirmationWindowSize.hashCode());
       result = prime * result + ((failoverOnInitialConnection == null) ? 0 : failoverOnInitialConnection.hashCode());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index ed9edb1..4a69128 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -257,6 +257,15 @@ public interface Configuration {
 
    Configuration addAcceptorConfiguration(final TransportConfiguration infos);
 
+   /**
+    * Add an acceptor to the config
+    * @param name the name of the acceptor
+    * @param uri the URI of the acceptor
+    * @return this
+    * @throws Exception in case of Parsing errors on the URI
+    */
+   Configuration addAcceptorConfiguration(String name, String uri) throws Exception;
+
    Configuration clearAcceptorConfigurations();
 
    /**
@@ -271,6 +280,8 @@ public interface Configuration {
 
    Configuration addConnectorConfiguration(final String key, final TransportConfiguration info);
 
+   Configuration addConnectorConfiguration(final String name, final String uri) throws Exception;
+
    Configuration clearConnectorConfigurations();
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 8b21f8a..b8b4c2d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -51,6 +51,8 @@ import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
+import org.apache.activemq.artemis.uri.AcceptorTransportConfigurationParser;
+import org.apache.activemq.artemis.uri.ConnectorTransportConfigurationParser;
 import org.apache.activemq.artemis.utils.ObjectInputStreamWithClassLoader;
 
 public class ConfigurationImpl implements Configuration, Serializable {
@@ -337,6 +339,19 @@ public class ConfigurationImpl implements Configuration, Serializable {
       return this;
    }
 
+   public ConfigurationImpl addAcceptorConfiguration(final String name, final String uri) throws Exception {
+
+      AcceptorTransportConfigurationParser parser = new AcceptorTransportConfigurationParser();
+
+      List<TransportConfiguration> configurations = parser.newObject(parser.expandURI(uri), name);
+
+      for (TransportConfiguration config : configurations) {
+         addAcceptorConfiguration(config);
+      }
+
+      return this;
+   }
+
    public ConfigurationImpl clearAcceptorConfigurations() {
       acceptorConfigs.clear();
       return this;
@@ -356,6 +371,21 @@ public class ConfigurationImpl implements Configuration, Serializable {
       return this;
    }
 
+
+   public ConfigurationImpl addConnectorConfiguration(final String name, final String uri) throws Exception {
+
+      ConnectorTransportConfigurationParser parser = new ConnectorTransportConfigurationParser();
+
+      List<TransportConfiguration> configurations = parser.newObject(parser.expandURI(uri), name);
+
+      for (TransportConfiguration config : configurations) {
+         addConnectorConfiguration(name, config);
+      }
+
+      return this;
+   }
+
+
    public ConfigurationImpl clearConnectorConfigurations() {
       connectorConfigs.clear();
       return this;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
index e332887..78a5a62 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
@@ -154,7 +154,15 @@ public class ProtocolHandler {
 
          //if we get here we assume we use the core protocol as we match nothing else
          if (protocolToUse == null) {
-            protocolToUse = ActiveMQClient.DEFAULT_CORE_PROTOCOL;
+            for (Map.Entry<String, ProtocolManager> entry : protocolMap.entrySet()) {
+               if (entry.getValue().acceptsNoHandshake()) {
+                  protocolToUse = entry.getKey();
+                  break;
+               }
+            }
+            if (protocolToUse == null) {
+               protocolToUse = ActiveMQClient.DEFAULT_CORE_PROTOCOL;
+            }
          }
          ProtocolManager protocolManagerToUse = protocolMap.get(protocolToUse);
          ConnectionCreator channelHandler = nettyAcceptor.createConnectionCreator();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index da6e5b1..6295ed6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl;
 
-import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -25,6 +24,7 @@ import java.util.concurrent.RejectedExecutionException;
 
 import io.netty.channel.ChannelPipeline;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.Pair;
@@ -98,6 +98,11 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
       this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
    }
 
+   @Override
+   public boolean acceptsNoHandshake() {
+      return false;
+   }
+
    /**
     * no need to implement this now
     *
@@ -162,23 +167,25 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
 
    @Override
    public boolean isProtocol(byte[] array) {
-      String frameStart = new String(array, StandardCharsets.US_ASCII);
-      return frameStart.startsWith("ACTIVEMQ");
+      return isArtemis(ActiveMQBuffers.wrappedBuffer(array));
    }
 
    @Override
    public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
       //if we are not an old client then handshake
-      if (buffer.getByte(0) == 'A' &&
+      if (isArtemis(buffer)) {
+         buffer.readBytes(7);
+      }
+   }
+
+   private boolean isArtemis(ActiveMQBuffer buffer) {
+      return buffer.getByte(0) == 'A' &&
          buffer.getByte(1) == 'R' &&
          buffer.getByte(2) == 'T' &&
          buffer.getByte(3) == 'E' &&
          buffer.getByte(4) == 'M' &&
          buffer.getByte(5) == 'I' &&
-         buffer.getByte(6) == 'S') {
-         //todo add some handshaking
-         buffer.readBytes(7);
-      }
+         buffer.getByte(6) == 'S';
    }
 
    @Override


Mime
View raw message