activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-900 fix compatibility after address changes
Date Thu, 22 Dec 2016 15:25:57 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master a3490cad2 -> 22f0fcf08


ARTEMIS-900 fix compatibility after address changes


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b9a7e152
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b9a7e152
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b9a7e152

Branch: refs/heads/master
Commit: b9a7e152f7fa8ce0fb5edcfed4ab6b92a0b6aff4
Parents: a3490ca
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Wed Dec 21 17:31:30 2016 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Dec 21 19:40:35 2016 -0500

----------------------------------------------------------------------
 .../core/protocol/core/impl/PacketImpl.java     | 22 +++++++
 .../impl/wireformat/QueueAbstractPacket.java    | 60 ++++++++++++++++++++
 .../wireformat/SessionBindingQueryMessage.java  |  9 +--
 .../SessionCreateConsumerMessage.java           |  9 +--
 .../wireformat/SessionQueueQueryMessage.java    |  9 +--
 .../artemis/api/core/QueueAbstractTest.java     | 48 ++++++++++++++++
 .../artemis/jms/client/ActiveMQSession.java     | 54 +++++++++++-------
 .../core/ServerSessionPacketHandler.java        | 10 ++--
 .../core/impl/ActiveMQPacketHandler.java        | 14 ++++-
 .../core/server/impl/ServerSessionImpl.java     | 11 +++-
 10 files changed, 193 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b9a7e152/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 96cde97..646eb28 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.protocol.core.impl;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.DataConstants;
@@ -24,6 +25,11 @@ import org.apache.activemq.artemis.utils.DataConstants;
 public class PacketImpl implements Packet {
    // Constants -------------------------------------------------------------------------
 
+   public static final int ADDRESSING_CHANGE_VERSION = 129;
+
+   public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
+   public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic.");
+
    // The minimal size for all the packets, Common data for all the packets (look at
    // PacketImpl.encode)
    public static final int PACKET_HEADERS_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE
+
@@ -267,6 +273,20 @@ public class PacketImpl implements Packet {
 
    // Public --------------------------------------------------------
 
+   public SimpleString convertName(SimpleString name) {
+      if (name == null) {
+         return null;
+      }
+
+      if (name.startsWith(OLD_QUEUE_PREFIX)) {
+         return name.subSeq(OLD_QUEUE_PREFIX.length(), name.length());
+      } else if (name.startsWith(OLD_TOPIC_PREFIX)) {
+         return name.subSeq(OLD_TOPIC_PREFIX.length(), name.length());
+      } else {
+         return name;
+      }
+   }
+
    @Override
    public byte getType() {
       return type;
@@ -376,4 +396,6 @@ public class PacketImpl implements Packet {
    protected int nullableStringEncodeSize(final String str) {
       return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0);
    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b9a7e152/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java
new file mode 100644
index 0000000..57b72cd
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+
+public abstract class QueueAbstractPacket extends PacketImpl {
+
+   protected SimpleString queueName;
+   protected SimpleString oldVersionQueueName;
+
+   protected SimpleString address;
+   protected SimpleString oldVersionAddresseName;
+
+   public SimpleString getQueueName(int clientVersion) {
+
+      if (clientVersion < ADDRESSING_CHANGE_VERSION) {
+         if (oldVersionQueueName == null) {
+            oldVersionQueueName = convertName(queueName);
+         }
+
+         return oldVersionQueueName;
+      } else {
+         return queueName;
+      }
+   }
+
+   public SimpleString getAddress(int clientVersion) {
+
+      if (clientVersion < ADDRESSING_CHANGE_VERSION) {
+         if (oldVersionAddresseName == null) {
+            oldVersionAddresseName = convertName(address);
+         }
+
+         return oldVersionAddresseName;
+      } else {
+         return address;
+      }
+   }
+
+   public QueueAbstractPacket(byte type) {
+      super(type);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b9a7e152/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryMessage.java
index 0bb06e2..d33b884 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryMessage.java
@@ -18,11 +18,8 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 
-public class SessionBindingQueryMessage extends PacketImpl {
-
-   private SimpleString address;
+public class SessionBindingQueryMessage extends QueueAbstractPacket {
 
    public SessionBindingQueryMessage(final SimpleString address) {
       super(SESS_BINDINGQUERY);
@@ -34,10 +31,6 @@ public class SessionBindingQueryMessage extends PacketImpl {
       super(SESS_BINDINGQUERY);
    }
 
-   public SimpleString getAddress() {
-      return address;
-   }
-
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeSimpleString(address);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b9a7e152/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
index afff162..f09beeb 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
@@ -18,14 +18,11 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 
-public class SessionCreateConsumerMessage extends PacketImpl {
+public class SessionCreateConsumerMessage extends QueueAbstractPacket {
 
    private long id;
 
-   private SimpleString queueName;
-
    private SimpleString filterString;
 
    private boolean browseOnly;
@@ -66,10 +63,6 @@ public class SessionCreateConsumerMessage extends PacketImpl {
       return id;
    }
 
-   public SimpleString getQueueName() {
-      return queueName;
-   }
-
    public SimpleString getFilterString() {
       return filterString;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b9a7e152/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryMessage.java
index 172f29f..1b8c526 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryMessage.java
@@ -18,11 +18,8 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 
-public class SessionQueueQueryMessage extends PacketImpl {
-
-   private SimpleString queueName;
+public class SessionQueueQueryMessage extends QueueAbstractPacket {
 
    public SessionQueueQueryMessage(final SimpleString queueName) {
       super(SESS_QUEUEQUERY);
@@ -34,10 +31,6 @@ public class SessionQueueQueryMessage extends PacketImpl {
       super(SESS_QUEUEQUERY);
    }
 
-   public SimpleString getQueueName() {
-      return queueName;
-   }
-
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeSimpleString(queueName);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b9a7e152/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/QueueAbstractTest.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/QueueAbstractTest.java
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/QueueAbstractTest.java
new file mode 100644
index 0000000..e75c184
--- /dev/null
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/QueueAbstractTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core;
+
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QueueAbstractPacket;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class QueueAbstractTest {
+
+   class MyTest extends QueueAbstractPacket {
+
+      MyTest(String name) {
+         super((byte)0);
+         this.queueName = SimpleString.toSimpleString(name);
+      }
+   }
+
+
+   @Test
+   public void testOldTopic() {
+      MyTest test = new MyTest("jms.topic.mytopic");
+
+      Assert.assertEquals("mytopic", test.getQueueName(127).toString());
+   }
+
+   @Test
+   public void testOldQueue() {
+      MyTest test = new MyTest("jms.queue.myQueue");
+
+      Assert.assertEquals("myQueue", test.getQueueName(127).toString());
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b9a7e152/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index 024b8a2..4b89817 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -367,19 +367,26 @@ public class ActiveMQSession implements QueueSession, TopicSession {
       }
 
       try {
-         ActiveMQQueue queue = lookupQueue(queueName, false);
+         return internalCreateQueue(queueName, false);
+      } catch (ActiveMQException e) {
+         throw JMSExceptionHelper.convertFromActiveMQException(e);
+      }
+   }
 
-         if (queue == null) {
-            queue = lookupQueue(queueName, true);
-         }
+   protected Queue internalCreateQueue(String queueName, final boolean retry) throws ActiveMQException,
JMSException {
+      ActiveMQQueue queue = lookupQueue(queueName, false);
 
-         if (queue == null) {
-            throw new JMSException("There is no queue with name " + queueName);
-         } else {
-            return queue;
+      if (queue == null) {
+         queue = lookupQueue(queueName, true);
+      }
+
+      if (queue == null) {
+         if (!retry) {
+            return internalCreateQueue("jms.queue." + queueName, true);
          }
-      } catch (ActiveMQException e) {
-         throw JMSExceptionHelper.convertFromActiveMQException(e);
+         throw new JMSException("There is no queue with name " + queueName);
+      } else {
+         return queue;
       }
    }
 
@@ -391,19 +398,26 @@ public class ActiveMQSession implements QueueSession, TopicSession {
       }
 
       try {
-         ActiveMQTopic topic = lookupTopic(topicName, false);
+         return internalCreateTopic(topicName, false);
+      } catch (ActiveMQException e) {
+         throw JMSExceptionHelper.convertFromActiveMQException(e);
+      }
+   }
 
-         if (topic == null) {
-            topic = lookupTopic(topicName, true);
-         }
+   protected Topic internalCreateTopic(String topicName, boolean retry) throws ActiveMQException,
JMSException {
+      ActiveMQTopic topic = lookupTopic(topicName, false);
 
-         if (topic == null) {
-            throw new JMSException("There is no topic with name " + topicName);
-         } else {
-            return topic;
+      if (topic == null) {
+         topic = lookupTopic(topicName, true);
+      }
+
+      if (topic == null) {
+         if (!retry) {
+            return internalCreateTopic("jms.topic." + topicName, true);
          }
-      } catch (ActiveMQException e) {
-         throw JMSExceptionHelper.convertFromActiveMQException(e);
+         throw new JMSException("There is no topic with name " + topicName);
+      } else {
+         return topic;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b9a7e152/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index acb1c4c..be2f04d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -150,7 +150,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
 
       this.remotingConnection = channel.getConnection();
 
-      //TODO think of a better way of doing this
       Connection conn = remotingConnection.getTransportConnection();
 
       if (conn instanceof NettyConnection) {
@@ -215,11 +214,12 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                case SESS_CREATECONSUMER: {
                   SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
                   requiresResponse = request.isRequiresResponse();
-                  session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(),
request.isBrowseOnly());
+                  session.createConsumer(request.getID(), request.getQueueName(remotingConnection.getClientVersion()),
request.getFilterString(), request.isBrowseOnly());
                   if (requiresResponse) {
                      // We send back queue information on the queue as a response- this allows
the queue to
                      // be automatically recreated on failover
-                     QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName());
+                     QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion()));
+
                      if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
                         response = new SessionQueueQueryResponseMessage_V3(queueQueryResult);
                      } else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
@@ -287,7 +287,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                case SESS_QUEUEQUERY: {
                   requiresResponse = true;
                   SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
-                  QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
+                  QueueQueryResult result = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion()));
                   if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
                      response = new SessionQueueQueryResponseMessage_V3(result);
                   } else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
@@ -300,7 +300,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                case SESS_BINDINGQUERY: {
                   requiresResponse = true;
                   SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet;
-                  BindingQueryResult result = session.executeBindingQuery(request.getAddress());
+                  BindingQueryResult result = session.executeBindingQuery(request.getAddress(remotingConnection.getClientVersion()));
                   if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) {
                      response = new SessionBindingQueryResponseMessage_V4(result.isExists(),
result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultDeleteOnNoConsumers(),
result.getDefaultMaxConsumers());
                   } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b9a7e152/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index 1034919..31ab624 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
@@ -38,6 +41,7 @@ import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.version.Version;
 import org.jboss.logging.Logger;
@@ -153,7 +157,15 @@ public class ActiveMQPacketHandler implements ChannelHandler {
 
          OperationContext sessionOperationContext = server.newOperationContext();
 
-         ServerSession session = server.createSession(request.getName(), activeMQPrincipal
== null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null
? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(),
connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(),
request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager,
channel, connection), true, sessionOperationContext, protocolManager.getPrefixes());
+         Map<SimpleString, RoutingType> routingTypeMap = protocolManager.getPrefixes();
+
+         if (connection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
+            routingTypeMap = new HashMap<>();
+            routingTypeMap.put(PacketImpl.OLD_QUEUE_PREFIX, RoutingType.ANYCAST);
+            routingTypeMap.put(PacketImpl.OLD_TOPIC_PREFIX, RoutingType.MULTICAST);
+         }
+
+         ServerSession session = server.createSession(request.getName(), activeMQPrincipal
== null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null
? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(),
connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(),
request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager,
channel, connection), true, sessionOperationContext, routingTypeMap);
 
          ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(),
channel);
          channel.setHandler(handler);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b9a7e152/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 6d4c588..ea67c56 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1330,6 +1330,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener
{
 
       SimpleString address = removePrefix(message.getAddress());
 
+      // In case the prefix was removed, we also need to update the message
+      if (address != message.getAddress()) {
+         message.setAddress(address);
+      }
+
       if (defaultAddress == null && address != null) {
          defaultAddress = address;
       }
@@ -1349,12 +1354,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener
{
          logger.trace("send(message=" + message + ", direct=" + direct + ") being called");
       }
 
-      if (address == null) {
+      if (message.getAddress() == null) {
          // This could happen with some tests that are ignoring messages
          throw ActiveMQMessageBundle.BUNDLE.noAddress();
       }
 
-      if (address.equals(managementAddress)) {
+      if (message.getAddress().equals(managementAddress)) {
          // It's a management message
 
          handleManagementMessage(tx, message, direct);
@@ -1733,7 +1738,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
{
 
    @Override
    public SimpleString removePrefix(SimpleString address) {
-      if (prefixEnabled) {
+      if (prefixEnabled && address != null) {
          return PrefixUtil.getAddress(address, prefixes);
       }
       return address;


Mime
View raw message