Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8F48F185B2 for ; Fri, 9 Oct 2015 02:55:53 +0000 (UTC) Received: (qmail 19039 invoked by uid 500); 9 Oct 2015 02:55:53 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 18981 invoked by uid 500); 9 Oct 2015 02:55:53 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 18711 invoked by uid 99); 9 Oct 2015 02:55:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Oct 2015 02:55:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DED63E1072; Fri, 9 Oct 2015 02:55:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Fri, 09 Oct 2015 02:55:55 -0000 Message-Id: In-Reply-To: <1d98cb3e3c6f46e89b074a81d6e67a36@git.apache.org> References: <1d98cb3e3c6f46e89b074a81d6e67a36@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/6] activemq-artemis git commit: ARTEMIS-238 and ARTEMIS-236 Fixing Legacy protocol support ARTEMIS-238 and ARTEMIS-236 Fixing Legacy protocol support Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/206acdac Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/206acdac Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/206acdac Branch: refs/heads/master Commit: 206acdac7d5d2581c78337988949c0dca07cb30a Parents: 1c067a5 Author: Clebert Suconic Authored: Wed Oct 7 22:37:34 2015 -0400 Committer: Clebert Suconic Committed: Thu Oct 8 20:32:43 2015 -0400 ---------------------------------------------------------------------- .../artemis/api/core/client/ServerLocator.java | 16 +- .../core/client/impl/ServerLocatorImpl.java | 83 +- .../core/client/impl/ServerLocatorInternal.java | 2 +- .../impl/ActiveMQClientProtocolManager.java | 26 +- .../ActiveMQClientProtocolManagerFactory.java | 21 +- .../core/impl/ActiveMQSessionContext.java | 30 +- .../core/impl/wireformat/MessagePacket.java | 2 +- .../core/impl/wireformat/MessagePacketI.java | 24 + .../wireformat/SessionReceiveLargeMessage.java | 8 +- .../wireformat/SessionSendLargeMessage.java | 8 +- .../remoting/ClientProtocolManagerFactory.java | 6 + .../jms/client/ActiveMQConnectionFactory.java | 52 + .../artemis/uri/ConnectionFactoryURITest.java | 13 + .../config/ConnectionFactoryConfiguration.java | 6 + .../ConnectionFactoryConfigurationImpl.java | 28 +- .../jms/server/impl/JMSServerManagerImpl.java | 8 +- .../protocol/proton/ProtonProtocolManager.java | 5 + .../HQPropertiesConversionInterceptor.java | 68 +- .../hornetq/HornetQProtocolManager.java | 12 +- .../hornetq/HornetQProtocolManagerFactory.java | 5 +- .../client/HornetQClientProtocolManager.java | 67 ++ .../HornetQClientProtocolManagerFactory.java | 46 + .../client/HornetQClientSessionContext.java | 101 ++ .../hornetq/util/HQPropertiesConverter.java | 86 ++ .../core/protocol/mqtt/MQTTProtocolManager.java | 10 +- .../openwire/OpenWireProtocolManager.java | 5 + .../protocol/stomp/StompProtocolManager.java | 5 + .../artemis/ra/ActiveMQResourceAdapter.java | 20 + .../artemis/ra/ConnectionFactoryProperties.java | 17 + .../artemis/core/config/Configuration.java | 11 + .../core/config/impl/ConfigurationImpl.java | 30 + .../artemis/core/protocol/ProtocolHandler.java | 10 +- .../protocol/core/impl/CoreProtocolManager.java | 23 +- ...ctiveMQServerSideProtocolManagerFactory.java | 20 +- .../core/server/cluster/BackupManager.java | 4 +- .../core/server/cluster/ClusterController.java | 4 +- .../cluster/impl/ClusterConnectionBridge.java | 2 +- .../cluster/impl/ClusterConnectionImpl.java | 4 +- .../impl/BackupRecoveryJournalLoader.java | 10 +- .../core/server/impl/LiveOnlyActivation.java | 2 +- .../spi/core/protocol/ProtocolManager.java | 5 + .../artemis/tests/util/ActiveMQTestBase.java | 16 + .../xa/recovery/ActiveMQXAResourceRecovery.java | 2 +- .../xa/recovery/XARecoveryConfig.java | 46 +- .../tests/recovery/XARecoveryConfigTest.java | 67 ++ .../hornetq/HornetQProtocolManagerTest.java | 138 +++ .../protocols/hornetq/HornetQProtocolTest.java | 55 +- .../tests/integration/InterceptorTest.java | 1030 ----------------- .../cluster/ClusterControllerTest.java | 4 +- .../cluster/distribution/ClusterTestBase.java | 2 +- .../integration/interceptors/Incoming.java | 42 + .../interceptors/InterceptorTest.java | 1089 ++++++++++++++++++ .../integration/interceptors/Outgoing.java | 41 + .../integration/jms/client/ConnectionTest.java | 39 +- .../ra/ActiveMQResourceAdapterConfigTest.java | 6 + .../ra/ConnectionFactoryPropertiesTest.java | 2 + 56 files changed, 2307 insertions(+), 1177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java index 8f086c6..b198878 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java @@ -769,6 +769,20 @@ public interface ServerLocator extends AutoCloseable { ClientProtocolManagerFactory getProtocolManagerFactory(); - void setProtocolManagerFactory(ClientProtocolManagerFactory protocolManager); + ServerLocator setProtocolManagerFactory(ClientProtocolManagerFactory protocolManager); + + /** + * @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method. + * @return this + */ + ServerLocator setIncomingInterceptorList(String interceptorList); + + String getIncomingInterceptorList(); + + ServerLocator setOutgoingInterceptorList(String interceptorList); + + String getOutgoingInterceptorList(); + + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index c979246..3f1eead 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -80,7 +80,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private static final long serialVersionUID = -1615857864410205260L; // This is the default value - private ClientProtocolManagerFactory protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance(); + private ClientProtocolManagerFactory protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance(this); private final boolean ha; @@ -201,12 +201,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private TransportConfiguration clusterTransportConfiguration; - /* - * *************WARNING*************** - * remember that when adding any new classes that we have to support serialization with previous clients. - * If you need to, make them transient and handle the serialization yourself - * */ - private final Exception traceException = new Exception(); // To be called when there are ServerLocator being finalized. @@ -619,14 +613,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery public ClientProtocolManagerFactory getProtocolManagerFactory() { if (protocolManagerFactory == null) { - // this could happen over serialization from older versions - protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance(); + // Default one in case it's null + protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance(this); } return protocolManagerFactory; } - public void setProtocolManagerFactory(ClientProtocolManagerFactory protocolManagerFactory) { + public ServerLocator setProtocolManagerFactory(ClientProtocolManagerFactory protocolManagerFactory) { this.protocolManagerFactory = protocolManagerFactory; + protocolManagerFactory.setLocator(this); + return this; } public void disableFinalizeCheck() { @@ -860,10 +856,41 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return factory; } + @Override public boolean isHA() { return ha; } + /** + * @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method. + * @return this + */ + @Override + public ServerLocator setIncomingInterceptorList(String interceptorList) { + feedInterceptors(incomingInterceptors, interceptorList); + return this; + } + + @Override + public String getIncomingInterceptorList() { + return fromInterceptors(incomingInterceptors); + } + + /** + * @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method. + * @return this + */ + @Override + public ServerLocator setOutgoingInterceptorList(String interceptorList) { + feedInterceptors(outgoingInterceptors, interceptorList); + return this; + } + + @Override + public String getOutgoingInterceptorList() { + return fromInterceptors(outgoingInterceptors); + } + public boolean isCacheLargeMessagesClient() { return cacheLargeMessagesClient; } @@ -1775,4 +1802,40 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery public boolean isReceivedToplogy() { return receivedTopology; } + + private String fromInterceptors(final List interceptors) { + StringBuffer buffer = new StringBuffer(); + boolean first = true; + for (Interceptor value : interceptors) { + if (!first) { + buffer.append(","); + } + first = false; + buffer.append(value.getClass().getName()); + } + + return buffer.toString(); + } + + private void feedInterceptors(final List interceptors, final String interceptorList) { + interceptors.clear(); + + if (interceptorList == null || interceptorList.trim().equals("")) { + return; + } + AccessController.doPrivileged(new PrivilegedAction() { + public Object run() { + + String[] arrayInterceptor = interceptorList.split(","); + for (String strValue : arrayInterceptor) { + Interceptor interceptor = (Interceptor) ClassloadingUtil.newInstanceFromClassLoader(strValue.trim()); + interceptors.add(interceptor); + } + return null; + } + }); + + } + + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java index 42da789..0f945aa 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java @@ -19,10 +19,10 @@ package org.apache.activemq.artemis.core.client.impl; import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; -import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; public interface ServerLocatorInternal extends ServerLocator { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java index 73ea529..b45fd5a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java @@ -276,7 +276,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { long sessionChannelID = connection.generateChannelID(); - Packet request = new CreateSessionMessage(name, sessionChannelID, clientVersion.getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null); + Packet request = newCreateSessionPacket(clientVersion, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, minLargeMessageSize, confirmationWindowSize, sessionChannelID); try { // channel1 reference here has to go away @@ -325,10 +325,30 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { inCreateSessionLatch.countDown(); } } while (retry); + return newSessionContext(name, confirmationWindowSize, sessionChannel, response); + } + + protected Packet newCreateSessionPacket(Version clientVersion, + String name, + String username, + String password, + boolean xa, + boolean autoCommitSends, + boolean autoCommitAcks, + boolean preAcknowledge, + int minLargeMessageSize, + int confirmationWindowSize, + long sessionChannelID) { + return new CreateSessionMessage(name, sessionChannelID, clientVersion.getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null); + } + + protected SessionContext newSessionContext(String name, + int confirmationWindowSize, + Channel sessionChannel, + CreateSessionResponseMessage response) { // these objects won't be null, otherwise it would keep retrying on the previous loop return new ActiveMQSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize); - } public boolean cleanupBeforeFailover(ActiveMQException cause) { @@ -398,7 +418,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { return connection; } - private void sendHandshake(Connection transportConnection) { + protected void sendHandshake(Connection transportConnection) { if (transportConnection.isUsingProtocolHandling()) { // no need to send handshake on inVM as inVM is not using the NettyProtocolHandling ActiveMQBuffer amqbuffer = connection.createTransportBuffer(handshake.length()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java index a58834b..24727c9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl; +import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory; @@ -23,13 +24,25 @@ public class ActiveMQClientProtocolManagerFactory implements ClientProtocolManag private static final long serialVersionUID = 1; - private static final ActiveMQClientProtocolManagerFactory INSTANCE = new ActiveMQClientProtocolManagerFactory(); - private ActiveMQClientProtocolManagerFactory() { } - public static final ActiveMQClientProtocolManagerFactory getInstance() { - return INSTANCE; + ServerLocator locator; + + @Override + public ServerLocator getLocator() { + return locator; + } + + @Override + public void setLocator(ServerLocator locator) { + this.locator = locator; + } + + public static final ActiveMQClientProtocolManagerFactory getInstance(ServerLocator locator) { + ActiveMQClientProtocolManagerFactory factory = new ActiveMQClientProtocolManagerFactory(); + factory.setLocator(locator); + return factory; } public ClientProtocolManager newProtocolManager() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 5279de2..d8dc125 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -114,6 +114,19 @@ public class ActiveMQSessionContext extends SessionContext { private int confirmationWindow; private final String name; + protected Channel getSessionChannel() { + return sessionChannel; + } + + protected String getName() { + return name; + } + + protected int getConfirmationWindow() { + return confirmationWindow; + + } + public ActiveMQSessionContext(String name, RemotingConnection remotingConnection, Channel sessionChannel, @@ -536,7 +549,7 @@ public class ActiveMQSessionContext extends SessionContext { final boolean autoCommitAcks, final boolean preAcknowledge, final SimpleString defaultAddress) throws ActiveMQException { - Packet createRequest = new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, defaultAddress == null ? null : defaultAddress.toString()); + Packet createRequest = newCreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, defaultAddress); boolean retry; do { try { @@ -564,6 +577,17 @@ public class ActiveMQSessionContext extends SessionContext { } while (retry && !session.isClosing()); } + protected CreateSessionMessage newCreateSession(String username, + String password, + int minLargeMessageSize, + boolean xa, + boolean autoCommitSends, + boolean autoCommitAcks, + boolean preAcknowledge, + SimpleString defaultAddress) { + return new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, defaultAddress == null ? null : defaultAddress.toString()); + } + @Override public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException { ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo(); @@ -724,7 +748,7 @@ public class ActiveMQSessionContext extends SessionContext { return ((ActiveMQConsumerContext) consumer.getConsumerContext()).getId(); } - private ClassLoader lookupTCCL() { + protected ClassLoader lookupTCCL() { return AccessController.doPrivileged(new PrivilegedAction() { public ClassLoader run() { return Thread.currentThread().getContextClassLoader(); @@ -733,7 +757,7 @@ public class ActiveMQSessionContext extends SessionContext { } - private int calcWindowSize(final int windowSize) { + protected int calcWindowSize(final int windowSize) { int clientWindowSize; if (windowSize == -1) { // No flow control - buffer can increase without bound! Only use with http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java index abcb233..4ed86ba 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java @@ -20,7 +20,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -public abstract class MessagePacket extends PacketImpl { +public abstract class MessagePacket extends PacketImpl implements MessagePacketI { protected MessageInternal message; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacketI.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacketI.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacketI.java new file mode 100644 index 0000000..ea1146f --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacketI.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.Message; + +public interface MessagePacketI { + Message getMessage(); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java index 460cd23..8b32256 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java @@ -17,10 +17,11 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -public class SessionReceiveLargeMessage extends PacketImpl { +public class SessionReceiveLargeMessage extends PacketImpl implements MessagePacketI { private final MessageInternal message; @@ -58,6 +59,11 @@ public class SessionReceiveLargeMessage extends PacketImpl { return message; } + @Override + public Message getMessage() { + return message; + } + public long getConsumerID() { return consumerID; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java index 1bf9bbb..3c7dbe7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java @@ -17,10 +17,11 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -public class SessionSendLargeMessage extends PacketImpl { +public class SessionSendLargeMessage extends PacketImpl implements MessagePacketI { /** * Used only if largeMessage @@ -44,6 +45,11 @@ public class SessionSendLargeMessage extends PacketImpl { } @Override + public Message getMessage() { + return largeMessage; + } + + @Override public void encodeRest(final ActiveMQBuffer buffer) { largeMessage.encodeHeadersAndProperties(buffer); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManagerFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManagerFactory.java index c9c78a5..7e82238 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManagerFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManagerFactory.java @@ -16,7 +16,13 @@ */ package org.apache.activemq.artemis.spi.core.remoting; +import org.apache.activemq.artemis.api.core.client.ServerLocator; + public interface ClientProtocolManagerFactory { ClientProtocolManager newProtocolManager(); + + void setLocator(ServerLocator locator); + + ServerLocator getLocator(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java index 912554e..aa29bc5 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java @@ -39,6 +39,8 @@ import java.io.InvalidObjectException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.net.URI; +import java.security.AccessController; +import java.security.PrivilegedAction; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -50,8 +52,10 @@ import org.apache.activemq.artemis.api.jms.JMSFactoryType; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.jms.referenceable.ConnectionFactoryObjectFactory; import org.apache.activemq.artemis.jms.referenceable.SerializableObjectRefAddr; +import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory; import org.apache.activemq.artemis.uri.ConnectionFactoryParser; import org.apache.activemq.artemis.uri.ServerLocatorParser; +import org.apache.activemq.artemis.utils.ClassloadingUtil; /** *

ActiveMQ Artemis implementation of a JMS ConnectionFactory.

@@ -73,6 +77,8 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable, private String password; + private String protocolManagerFactoryStr; + public void writeExternal(ObjectOutput out) throws IOException { URI uri = toURI(); @@ -121,6 +127,27 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable, return uri; } + public String getProtocolManagerFactoryStr() { + return protocolManagerFactoryStr; + } + + public void setProtocolManagerFactoryStr(final String protocolManagerFactoryStr) { + + if (protocolManagerFactoryStr != null && !protocolManagerFactoryStr.trim().isEmpty()) { + AccessController.doPrivileged(new PrivilegedAction() { + public Object run() { + + ClientProtocolManagerFactory protocolManagerFactory = + (ClientProtocolManagerFactory) ClassloadingUtil.newInstanceFromClassLoader(protocolManagerFactoryStr); + serverLocator.setProtocolManagerFactory(protocolManagerFactory); + return null; + } + }); + + this.protocolManagerFactoryStr = protocolManagerFactoryStr; + } + } + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { String url = in.readUTF(); ConnectionFactoryParser parser = new ConnectionFactoryParser(); @@ -606,6 +633,31 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable, serverLocator.setInitialMessagePacketSize(size); } + /** + * @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method. + * @return this + */ + public void setIncomingInterceptorList(String interceptorList) { + checkWrite(); + serverLocator.setIncomingInterceptorList(interceptorList); + } + + public String getIncomingInterceptorList() { + return serverLocator.getIncomingInterceptorList(); + } + + /** + * @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method. + * @return this + */ + public void setOutgoingInterceptorList(String interceptorList) { + serverLocator.setOutgoingInterceptorList(interceptorList); + } + + public String getOutgoingInterceptorList() { + return serverLocator.getOutgoingInterceptorList(); + } + public ActiveMQConnectionFactory setUser(String user) { checkWrite(); this.user = user; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java b/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java index d374756..4863208 100644 --- a/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java +++ b/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java @@ -26,6 +26,7 @@ import java.io.ObjectOutputStream; import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -56,6 +57,14 @@ public class ConnectionFactoryURITest { private static final String IPV6 = "fe80::baf6:b1ff:fe12:daf7%eth0"; + private static Set ignoreList = new HashSet(); + + static { + ignoreList.add("protocolManagerFactoryStr"); + ignoreList.add("incomingInterceptorList"); + ignoreList.add("outgoingInterceptorList"); + } + @Test public void testIPv6() throws Exception { Map params = new HashMap<>(); @@ -379,6 +388,10 @@ public class ConnectionFactoryURITest { ActiveMQConnectionFactory factory) throws IllegalAccessException, InvocationTargetException { PropertyDescriptor[] descriptors = bean.getPropertyUtils().getPropertyDescriptors(factory); for (PropertyDescriptor descriptor : descriptors) { + if (ignoreList.contains(descriptor.getName())) { + continue; + } + System.err.println("name::" + descriptor.getName()); if (descriptor.getWriteMethod() != null && descriptor.getReadMethod() != null) { if (descriptor.getPropertyType() == String.class) { String value = RandomUtil.randomString(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java index 57a955f..ab4990c 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java @@ -44,6 +44,8 @@ public interface ConnectionFactoryConfiguration extends EncodingSupport { ConnectionFactoryConfiguration setConnectorNames(List connectorNames); + ConnectionFactoryConfiguration setConnectorNames(String...connectorNames); + boolean isHA(); ConnectionFactoryConfiguration setHA(boolean ha); @@ -170,5 +172,9 @@ public interface ConnectionFactoryConfiguration extends EncodingSupport { ConnectionFactoryConfiguration setFactoryType(JMSFactoryType factType); + ConnectionFactoryConfiguration setProtocolManagerFactoryStr(String protocolManagerFactoryStr); + + String getProtocolManagerFactoryStr(); + JMSFactoryType getFactoryType(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java index 43c5385..b5efcd7 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.jms.server.config.impl; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -113,6 +114,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf private String groupID = null; + private String protocolManagerFactoryStr; + private JMSFactoryType factoryType = JMSFactoryType.CF; // Static -------------------------------------------------------- @@ -170,6 +173,11 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return this; } + + public ConnectionFactoryConfiguration setConnectorNames(final String...names) { + return this.setConnectorNames(Arrays.asList(names)); + } + public boolean isHA() { return ha; } @@ -534,6 +542,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf groupID = BufferHelper.readNullableSimpleStringAsString(buffer); factoryType = JMSFactoryType.valueOf(buffer.readInt()); + + protocolManagerFactoryStr = BufferHelper.readNullableSimpleStringAsString(buffer); } @Override @@ -618,6 +628,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf BufferHelper.writeAsNullableSimpleString(buffer, groupID); buffer.writeInt(factoryType.intValue()); + + BufferHelper.writeAsNullableSimpleString(buffer, protocolManagerFactoryStr); } @Override @@ -724,7 +736,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf BufferHelper.sizeOfNullableSimpleString(groupID) + - DataConstants.SIZE_INT; // factoryType + DataConstants.SIZE_INT + + // factoryType + + BufferHelper.sizeOfNullableSimpleString(protocolManagerFactoryStr); return size; } @@ -749,6 +764,17 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return this.compressLargeMessage; } + @Override + public ConnectionFactoryConfiguration setProtocolManagerFactoryStr(String protocolManagerFactoryStr) { + this.protocolManagerFactoryStr = protocolManagerFactoryStr; + return this; + } + + @Override + public String getProtocolManagerFactoryStr() { + return protocolManagerFactoryStr; + } + // Public -------------------------------------------------------- // Package protected --------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java index ff246f2..99e0daa 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java @@ -955,7 +955,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback public void runException() throws Exception { checkBindings(bindings); - ActiveMQConnectionFactory cf = internalCreateCF(storeConfig, cfConfig); + ActiveMQConnectionFactory cf = internalCreateCF(cfConfig); ArrayList bindingsToAdd = new ArrayList(); @@ -1075,8 +1075,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback * @param cfConfig * @throws Exception */ - private ActiveMQConnectionFactory internalCreateCF(final boolean persisted, - final ConnectionFactoryConfiguration cfConfig) throws Exception { + private ActiveMQConnectionFactory internalCreateCF(final ConnectionFactoryConfiguration cfConfig) throws Exception { checkInitialised(); ActiveMQConnectionFactory cf = connectionFactories.get(cfConfig.getName()); @@ -1168,6 +1167,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback cf.setFailoverOnInitialConnection(cfConfig.isFailoverOnInitialConnection()); cf.setCompressLargeMessage(cfConfig.isCompressLargeMessages()); cf.setGroupID(cfConfig.getGroupID()); + cf.setProtocolManagerFactoryStr(cfConfig.getProtocolManagerFactoryStr()); return cf; } @@ -1445,7 +1445,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback List cfs = storage.recoverConnectionFactories(); for (PersistedConnectionFactory cf : cfs) { - internalCreateCF(true, cf.getConfig()); + internalCreateCF(cf.getConfig()); } List destinations = storage.recoverDestinations(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java index 3637289..9ac41ef 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java @@ -86,6 +86,11 @@ public class ProtonProtocolManager implements ProtocolManager, Noti } @Override + public boolean acceptsNoHandshake() { + return false; + } + + @Override public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) { ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection); long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java index cb47e85..012727f 100644 --- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java @@ -19,76 +19,36 @@ package org.apache.activemq.artemis.core.protocol.hornetq; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; -import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.core.Packet; -import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacket; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacketI; +import org.apache.activemq.artemis.core.protocol.hornetq.util.HQPropertiesConverter; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; - public class HQPropertiesConversionInterceptor implements Interceptor { - private static Map dictionary; - - static { - Map d = new HashMap(); - - // Add entries for outgoing messages - d.put(new SimpleString("_HQ_ACTUAL_EXPIRY"), new SimpleString("_AMQ_ACTUAL_EXPIRY")); - d.put(new SimpleString("_HQ_ORIG_ADDRESS"), new SimpleString("_AMQ_ORIG_ADDRESS")); - d.put(new SimpleString("_HQ_ORIG_QUEUE"), new SimpleString("_AMQ_ORIG_QUEUE")); - d.put(new SimpleString("_HQ_ORIG_MESSAGE_ID"), new SimpleString("_AMQ_ORIG_MESSAGE_ID")); - d.put(new SimpleString("_HQ_GROUP_ID"), new SimpleString("_AMQ_GROUP_ID")); - d.put(new SimpleString("_HQ_LARGE_COMPRESSED"), new SimpleString("_AMQ_LARGE_COMPRESSED")); - d.put(new SimpleString("_HQ_LARGE_SIZE"), new SimpleString("_AMQ_LARGE_SIZE")); - d.put(new SimpleString("_HQ_SCHED_DELIVERY"), new SimpleString("_AMQ_SCHED_DELIVERY")); - d.put(new SimpleString("_HQ_DUPL_ID"), new SimpleString("_AMQ_DUPL_ID")); - d.put(new SimpleString("_HQ_LVQ_NAME"), new SimpleString("_AMQ_LVQ_NAME")); - // Add entries for incoming messages - d.put(new SimpleString("_AMQ_ACTUAL_EXPIRY"), new SimpleString("_HQ_ACTUAL_EXPIRY")); - d.put(new SimpleString("_AMQ_ORIG_ADDRESS"), new SimpleString("_HQ_ORIG_ADDRESS")); - d.put(new SimpleString("_AMQ_ORIG_QUEUE"), new SimpleString("_HQ_ORIG_QUEUE")); - d.put(new SimpleString("_AMQ_ORIG_MESSAGE_ID"), new SimpleString("_HQ_ORIG_MESSAGE_ID")); - d.put(new SimpleString("_AMQ_GROUP_ID"), new SimpleString("_HQ_GROUP_ID")); - d.put(new SimpleString("_AMQ_LARGE_COMPRESSED"), new SimpleString("_HQ_LARGE_COMPRESSED")); - d.put(new SimpleString("_AMQ_LARGE_SIZE"), new SimpleString("_HQ_LARGE_SIZE")); - d.put(new SimpleString("_AMQ_SCHED_DELIVERY"), new SimpleString("_HQ_SCHED_DELIVERY")); - d.put(new SimpleString("_AMQ_DUPL_ID"), new SimpleString("_HQ_DUPL_ID")); - d.put(new SimpleString("_AMQ_LVQ_NAME"), new SimpleString("_HQ_LVQ_NAME")); + private final boolean replaceHQ; - dictionary = Collections.unmodifiableMap(d); + public HQPropertiesConversionInterceptor(final boolean replaceHQ) { + this.replaceHQ = replaceHQ; } @Override public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { - if (isMessagePacket(packet)) { - handleReceiveMessage((MessagePacket) packet); + + if (HQPropertiesConverter.isMessagePacket(packet)) { + handleReceiveMessage((MessagePacketI) packet); } return true; } - private void handleReceiveMessage(MessagePacket messagePacket) { - Message message = messagePacket.getMessage(); - // We are modifying the key set so we iterate over a shallow copy. - for (SimpleString property : new HashSet<>(message.getPropertyNames())) { - if (dictionary.containsKey(property)) { - message.putObjectProperty(dictionary.get(property), message.removeProperty(property)); - } + private void handleReceiveMessage(MessagePacketI messagePacket) { + if (replaceHQ) { + HQPropertiesConverter.replaceHQProperties(messagePacket.getMessage()); + } + else { + HQPropertiesConverter.replaceAMQProperties(messagePacket.getMessage()); } } - private boolean isMessagePacket(Packet packet) { - int type = packet.getType(); - return type == PacketImpl.SESS_SEND || - type == PacketImpl.SESS_SEND_CONTINUATION || - type == PacketImpl.SESS_SEND_LARGE || - type == PacketImpl.SESS_RECEIVE_LARGE_MSG || - type == PacketImpl.SESS_RECEIVE_MSG; - } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java index 3d6dab5..bd4274a 100644 --- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.artemis.core.protocol.hornetq; +import java.nio.charset.StandardCharsets; +import java.util.List; + import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager; @@ -23,9 +26,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFa import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import java.nio.charset.StandardCharsets; -import java.util.List; - /** * HornetQ Protocol Manager */ @@ -54,6 +54,12 @@ class HornetQProtocolManager extends CoreProtocolManager { } @Override + public boolean acceptsNoHandshake() { + return true; + } + + + @Override public boolean isProtocol(byte[] array) { String frameStart = new String(array, StandardCharsets.US_ASCII); return frameStart.startsWith("HORNETQ"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java index a163459..33c9a78 100644 --- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java @@ -34,9 +34,8 @@ public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory { public ProtocolManager createProtocolManager(final ActiveMQServer server, final List incomingInterceptors, List outgoingInterceptors) { - Interceptor propertyConversionInterceptor = new HQPropertiesConversionInterceptor(); - incomingInterceptors.add(propertyConversionInterceptor); - outgoingInterceptors.add(propertyConversionInterceptor); + incomingInterceptors.add(new HQPropertiesConversionInterceptor(true)); + outgoingInterceptors.add(new HQPropertiesConversionInterceptor(false)); return new HornetQProtocolManager(this, server, incomingInterceptors, outgoingInterceptors); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java new file mode 100644 index 0000000..a1d9a60 --- /dev/null +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.hornetq.client; + +import org.apache.activemq.artemis.core.protocol.core.Channel; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; +import org.apache.activemq.artemis.core.version.Version; +import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.remoting.SessionContext; + +public class HornetQClientProtocolManager extends ActiveMQClientProtocolManager { + + private static final int VERSION_PLAYED = 123; + protected void sendHandshake(Connection transportConnection) { + } + + + protected SessionContext newSessionContext(String name, + int confirmationWindowSize, + Channel sessionChannel, + CreateSessionResponseMessage response) { + // these objects won't be null, otherwise it would keep retrying on the previous loop + return new HornetQClientSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize); + } + + @Override + protected Packet newCreateSessionPacket(Version clientVersion, + String name, + String username, + String password, + boolean xa, + boolean autoCommitSends, + boolean autoCommitAcks, + boolean preAcknowledge, + int minLargeMessageSize, + int confirmationWindowSize, + long sessionChannelID) { + return new CreateSessionMessage(name, sessionChannelID, VERSION_PLAYED, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null); + } + + @Override + public void sendSubscribeTopology(final boolean isServer) { + getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, VERSION_PLAYED)); + } + + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java new file mode 100644 index 0000000..ed57cfe --- /dev/null +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.hornetq.client; + +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.protocol.hornetq.HQPropertiesConversionInterceptor; +import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; +import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory; + +public class HornetQClientProtocolManagerFactory implements ClientProtocolManagerFactory { + + + ServerLocator locator; + + @Override + public ServerLocator getLocator() { + return locator; + } + + @Override + public void setLocator(ServerLocator locator) { + this.locator = locator; + locator.addIncomingInterceptor(new HQPropertiesConversionInterceptor(true)); + locator.addOutgoingInterceptor(new HQPropertiesConversionInterceptor(false)); + } + + @Override + public ClientProtocolManager newProtocolManager() { + return new HornetQClientProtocolManager(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java new file mode 100644 index 0000000..169a82a --- /dev/null +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.hornetq.client; + +import java.util.concurrent.Executor; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.core.client.impl.AddressQueryImpl; +import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; +import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; +import org.apache.activemq.artemis.core.protocol.core.Channel; +import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext; +import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl; + +public class HornetQClientSessionContext extends ActiveMQSessionContext { + + public HornetQClientSessionContext(String name, + RemotingConnection remotingConnection, + Channel sessionChannel, + int serverVersion, + int confirmationWindow) { + super(name, remotingConnection, sessionChannel, serverVersion, confirmationWindow); + } + + + public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException { + SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName); + SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP); + + return response.toQueueQuery(); + } + + protected CreateSessionMessage newCreateSession(String username, + String password, + int minLargeMessageSize, + boolean xa, + boolean autoCommitSends, + boolean autoCommitAcks, + boolean preAcknowledge, + SimpleString defaultAddress) { + return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), defaultAddress == null ? null : defaultAddress.toString()); + } + + + public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { + SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) getSessionChannel().sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP); + + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false); + } + + public ClientConsumerInternal createConsumer(SimpleString queueName, + SimpleString filterString, + int windowSize, + int maxRate, + int ackBatchSize, + boolean browseOnly, + Executor executor, + Executor flowControlExecutor) throws ActiveMQException { + long consumerID = idGenerator.generateID(); + + ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID); + + SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true); + + SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP); + + // The actual windows size that gets used is determined by the user since + // could be overridden on the queue settings + // The value we send is just a hint + + return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java new file mode 100644 index 0000000..9240e55 --- /dev/null +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.hornetq.util; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; + +public class HQPropertiesConverter { + + private static Map hqAmqDictionary; + private static Map amqHqDictionary; + + static { + Map d = new HashMap(); + + // Add entries for outgoing messages + d.put(new SimpleString("_HQ_ACTUAL_EXPIRY"), new SimpleString("_AMQ_ACTUAL_EXPIRY")); + d.put(new SimpleString("_HQ_ORIG_ADDRESS"), new SimpleString("_AMQ_ORIG_ADDRESS")); + d.put(new SimpleString("_HQ_ORIG_QUEUE"), new SimpleString("_AMQ_ORIG_QUEUE")); + d.put(new SimpleString("_HQ_ORIG_MESSAGE_ID"), new SimpleString("_AMQ_ORIG_MESSAGE_ID")); + d.put(new SimpleString("_HQ_GROUP_ID"), new SimpleString("_AMQ_GROUP_ID")); + d.put(new SimpleString("_HQ_LARGE_COMPRESSED"), new SimpleString("_AMQ_LARGE_COMPRESSED")); + d.put(new SimpleString("_HQ_LARGE_SIZE"), new SimpleString("_AMQ_LARGE_SIZE")); + d.put(new SimpleString("_HQ_SCHED_DELIVERY"), new SimpleString("_AMQ_SCHED_DELIVERY")); + d.put(new SimpleString("_HQ_DUPL_ID"), new SimpleString("_AMQ_DUPL_ID")); + d.put(new SimpleString("_HQ_LVQ_NAME"), new SimpleString("_AMQ_LVQ_NAME")); + + hqAmqDictionary = Collections.unmodifiableMap(d); + + d = new HashMap<>(); + // inverting the direction + for (Map.Entry entry: hqAmqDictionary.entrySet()) { + d.put(entry.getValue(), entry.getKey()); + } + + amqHqDictionary = Collections.unmodifiableMap(d); + } + + public static void replaceAMQProperties(final Message message) { + replaceDict(message, amqHqDictionary); + } + + public static void replaceHQProperties(final Message message) { + replaceDict(message, hqAmqDictionary); + } + + private static void replaceDict(final Message message, Map dictionary) { + for (SimpleString property : new HashSet<>(message.getPropertyNames())) { + SimpleString replaceTo = dictionary.get(property); + if (replaceTo != null) { + message.putObjectProperty(replaceTo, message.removeProperty(property)); + } + } + } + + public static boolean isMessagePacket(Packet packet) { + int type = packet.getType(); + return type == PacketImpl.SESS_SEND || + type == PacketImpl.SESS_SEND_LARGE || + type == PacketImpl.SESS_RECEIVE_LARGE_MSG || + type == PacketImpl.SESS_RECEIVE_MSG; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index ce75e4d..e99272f 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.protocol.mqtt; +import java.util.List; + import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; @@ -32,8 +34,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import java.util.List; - /** * MQTTProtocolManager */ @@ -80,6 +80,12 @@ class MQTTProtocolManager implements ProtocolManager, NotificationListener { } @Override + public boolean acceptsNoHandshake() { + return false; + } + + + @Override public void removeHandler(String name) { // TODO add support for handlers } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 04be7aa..9b35b90 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -150,6 +150,11 @@ public class OpenWireProtocolManager implements ProtocolManager, No } + @Override + public boolean acceptsNoHandshake() { + return false; + } + public ProtocolManagerFactory getFactory() { return factory; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 98d21e4..6dcd351 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -105,6 +105,11 @@ class StompProtocolManager implements ProtocolManager, No } @Override + public boolean acceptsNoHandshake() { + return false; + } + + @Override public ProtocolManagerFactory getFactory() { return factory; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java ---------------------------------------------------------------------- diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java index b4370c9..2a40e01 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java @@ -879,6 +879,22 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { raProperties.setProducerWindowSize(producerWindowSize); } + public String getProtocolManagerFactoryStr() { + if (ActiveMQResourceAdapter.trace) { + ActiveMQRALogger.LOGGER.trace("getProtocolManagerFactoryStr()"); + } + + return raProperties.getProtocolManagerFactoryStr(); + } + + public void setProtocolManagerFactoryStr(final String protocolManagerFactoryStr) { + if (ActiveMQResourceAdapter.trace) { + ActiveMQRALogger.LOGGER.trace("setProtocolManagerFactoryStr(" + protocolManagerFactoryStr + ")"); + } + + raProperties.setProtocolManagerFactoryStr(protocolManagerFactoryStr); + } + /** * Get min large message size * @@ -1971,6 +1987,10 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { if (val5 != null) { cf.setConnectionLoadBalancingPolicyClassName(val5); } + val5 = overrideProperties.getProtocolManagerFactoryStr() != null ? overrideProperties.getProtocolManagerFactoryStr() : raProperties.getProtocolManagerFactoryStr(); + if (val5 != null) { + cf.setProtocolManagerFactoryStr(val5); + } } public void setManagedConnectionFactory(ActiveMQRAManagedConnectionFactory activeMQRAManagedConnectionFactory) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java ---------------------------------------------------------------------- diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java index 9edd1d8..2137186 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java @@ -118,6 +118,8 @@ public class ConnectionFactoryProperties { private String groupID; + private String protocolManagerFactoryStr; + /** * @return the transportType */ @@ -679,6 +681,14 @@ public class ConnectionFactoryProperties { hasBeenUpdated = true; } + public String getProtocolManagerFactoryStr() { + return protocolManagerFactoryStr; + } + + public void setProtocolManagerFactoryStr(String protocolManagerFactoryStr) { + this.protocolManagerFactoryStr = protocolManagerFactoryStr; + } + public boolean isHasBeenUpdated() { return hasBeenUpdated; } @@ -890,6 +900,12 @@ public class ConnectionFactoryProperties { } else if (!this.producerWindowSize.equals(other.producerWindowSize)) return false; + else if (!protocolManagerFactoryStr.equals(other.protocolManagerFactoryStr)) + return false; + if (this.protocolManagerFactoryStr == null) { + if (other.protocolManagerFactoryStr != null) + return false; + } if (this.reconnectAttempts == null) { if (other.reconnectAttempts != null) return false; @@ -971,6 +987,7 @@ public class ConnectionFactoryProperties { result = prime * result + ((compressLargeMessage == null) ? 0 : compressLargeMessage.hashCode()); result = prime * result + ((consumerWindowSize == null) ? 0 : consumerWindowSize.hashCode()); result = prime * result + ((producerWindowSize == null) ? 0 : producerWindowSize.hashCode()); + result = prime * result + ((protocolManagerFactoryStr == null) ? 0 : protocolManagerFactoryStr.hashCode()); result = prime * result + ((consumerMaxRate == null) ? 0 : consumerMaxRate.hashCode()); result = prime * result + ((confirmationWindowSize == null) ? 0 : confirmationWindowSize.hashCode()); result = prime * result + ((failoverOnInitialConnection == null) ? 0 : failoverOnInitialConnection.hashCode()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index ed9edb1..4a69128 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -257,6 +257,15 @@ public interface Configuration { Configuration addAcceptorConfiguration(final TransportConfiguration infos); + /** + * Add an acceptor to the config + * @param name the name of the acceptor + * @param uri the URI of the acceptor + * @return this + * @throws Exception in case of Parsing errors on the URI + */ + Configuration addAcceptorConfiguration(String name, String uri) throws Exception; + Configuration clearAcceptorConfigurations(); /** @@ -271,6 +280,8 @@ public interface Configuration { Configuration addConnectorConfiguration(final String key, final TransportConfiguration info); + Configuration addConnectorConfiguration(final String name, final String uri) throws Exception; + Configuration clearConnectorConfigurations(); /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 8b21f8a..b8b4c2d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -51,6 +51,8 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; +import org.apache.activemq.artemis.uri.AcceptorTransportConfigurationParser; +import org.apache.activemq.artemis.uri.ConnectorTransportConfigurationParser; import org.apache.activemq.artemis.utils.ObjectInputStreamWithClassLoader; public class ConfigurationImpl implements Configuration, Serializable { @@ -337,6 +339,19 @@ public class ConfigurationImpl implements Configuration, Serializable { return this; } + public ConfigurationImpl addAcceptorConfiguration(final String name, final String uri) throws Exception { + + AcceptorTransportConfigurationParser parser = new AcceptorTransportConfigurationParser(); + + List configurations = parser.newObject(parser.expandURI(uri), name); + + for (TransportConfiguration config : configurations) { + addAcceptorConfiguration(config); + } + + return this; + } + public ConfigurationImpl clearAcceptorConfigurations() { acceptorConfigs.clear(); return this; @@ -356,6 +371,21 @@ public class ConfigurationImpl implements Configuration, Serializable { return this; } + + public ConfigurationImpl addConnectorConfiguration(final String name, final String uri) throws Exception { + + ConnectorTransportConfigurationParser parser = new ConnectorTransportConfigurationParser(); + + List configurations = parser.newObject(parser.expandURI(uri), name); + + for (TransportConfiguration config : configurations) { + addConnectorConfiguration(name, config); + } + + return this; + } + + public ConfigurationImpl clearConnectorConfigurations() { connectorConfigs.clear(); return this; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java index e332887..78a5a62 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java @@ -154,7 +154,15 @@ public class ProtocolHandler { //if we get here we assume we use the core protocol as we match nothing else if (protocolToUse == null) { - protocolToUse = ActiveMQClient.DEFAULT_CORE_PROTOCOL; + for (Map.Entry entry : protocolMap.entrySet()) { + if (entry.getValue().acceptsNoHandshake()) { + protocolToUse = entry.getKey(); + break; + } + } + if (protocolToUse == null) { + protocolToUse = ActiveMQClient.DEFAULT_CORE_PROTOCOL; + } } ProtocolManager protocolManagerToUse = protocolMap.get(protocolToUse); ConnectionCreator channelHandler = nettyAcceptor.createConnectionCreator(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/206acdac/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java index da6e5b1..6295ed6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl; -import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -25,6 +24,7 @@ import java.util.concurrent.RejectedExecutionException; import io.netty.channel.ChannelPipeline; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Pair; @@ -98,6 +98,11 @@ public class CoreProtocolManager implements ProtocolManager { this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing)); } + @Override + public boolean acceptsNoHandshake() { + return false; + } + /** * no need to implement this now * @@ -162,23 +167,25 @@ public class CoreProtocolManager implements ProtocolManager { @Override public boolean isProtocol(byte[] array) { - String frameStart = new String(array, StandardCharsets.US_ASCII); - return frameStart.startsWith("ACTIVEMQ"); + return isArtemis(ActiveMQBuffers.wrappedBuffer(array)); } @Override public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) { //if we are not an old client then handshake - if (buffer.getByte(0) == 'A' && + if (isArtemis(buffer)) { + buffer.readBytes(7); + } + } + + private boolean isArtemis(ActiveMQBuffer buffer) { + return buffer.getByte(0) == 'A' && buffer.getByte(1) == 'R' && buffer.getByte(2) == 'T' && buffer.getByte(3) == 'E' && buffer.getByte(4) == 'M' && buffer.getByte(5) == 'I' && - buffer.getByte(6) == 'S') { - //todo add some handshaking - buffer.readBytes(7); - } + buffer.getByte(6) == 'S'; } @Override