activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1387 Fix AMQPtoMQTT and Link route issues
Date Wed, 06 Sep 2017 10:59:16 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 125bd41f9 -> f8ccb6d31


ARTEMIS-1387 Fix AMQPtoMQTT and Link route issues


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/16dfd777
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/16dfd777
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/16dfd777

Branch: refs/heads/master
Commit: 16dfd777b85197786675a34286e58da321ab22c0
Parents: 125bd41
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Tue Sep 5 17:27:27 2017 +0100
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Wed Sep 6 11:21:56 2017 +0100

----------------------------------------------------------------------
 .../activemq/artemis/api/core/ICoreMessage.java |  2 +-
 .../activemq/artemis/api/core/Message.java      |  2 +-
 .../core/protocol/mqtt/MQTTPublishManager.java  |  8 ++---
 .../core/protocol/mqtt/MQTTSessionCallback.java |  4 +--
 .../artemis/core/protocol/mqtt/MQTTUtil.java    |  4 +++
 .../impl/openmbean/OpenTypeSupport.java         |  2 +-
 .../integration/mqtt/imported/MQTTTest.java     | 37 ++++++++++++++++++++
 7 files changed, 49 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/16dfd777/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
index f49ef68..fe2044b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
@@ -77,7 +77,7 @@ public interface ICoreMessage extends Message {
          map.put("userID", "ID:" + userID.toString());
       }
 
-      map.put("address", getAddress());
+      map.put("address", getAddress() == null ? "" : getAddress());
       map.put("type", getType());
       map.put("durable", isDurable());
       map.put("expiration", getExpiration());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/16dfd777/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 58433ce..61d887e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -616,7 +616,7 @@ public interface Message {
          map.put("userID", "ID:" + userID.toString());
       }
 
-      map.put("address", getAddress());
+      map.put("address", getAddress() == null ? "" : getAddress());
       map.put("durable", isDurable());
       map.put("expiration", getExpiration());
       map.put("timestamp", getTimestamp());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/16dfd777/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index e23385c..eb9c631 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -24,12 +24,12 @@ import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.EmptyByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -112,7 +112,7 @@ public class MQTTPublishManager {
     * to original ID and consumer in the Session state.  This way we can look up the consumer
Id and the message Id from
     * the PubAck or PubRec message id. *
     */
-   protected void sendMessage(CoreMessage message, ServerConsumer consumer, int deliveryCount)
throws Exception {
+   protected void sendMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount)
throws Exception {
       // This is to allow retries of PubRel.
       if (isManagementConsumer(consumer)) {
          sendPubRelMessage(message);
@@ -257,8 +257,8 @@ public class MQTTPublishManager {
       }
    }
 
-   private void sendServerMessage(int messageId, CoreMessage message, int deliveryCount,
int qos) {
-      String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(),
session.getWildcardConfiguration());
+   private void sendServerMessage(int messageId, ICoreMessage message, int deliveryCount,
int qos) {
+      String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress() == null
? "" : message.getAddress().toString(), session.getWildcardConfiguration());
       boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
 
       ByteBuf payload;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/16dfd777/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index a5b908f..21b1f2b 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -17,10 +17,8 @@
 
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
-
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -49,7 +47,7 @@ public class MQTTSessionCallback implements SessionCallback {
                           ServerConsumer consumer,
                           int deliveryCount) {
       try {
-         session.getMqttPublishManager().sendMessage((CoreMessage)message, consumer, deliveryCount);
+         session.getMqttPublishManager().sendMessage(message.toCore(), consumer, deliveryCount);
       } catch (Exception e) {
          log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(),
e);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/16dfd777/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 76664f6..098a756 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -84,6 +84,10 @@ public class MQTTUtil {
    private static final MQTTLogger logger = MQTTLogger.LOGGER;
 
    public static String convertCoreAddressFilterToMQTT(String filter, WildcardConfiguration
wildcardConfiguration) {
+      if (filter == null) {
+         return "";
+      }
+
       if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) {
          filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length());
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/16dfd777/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
index e7df48b..0c781b7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
@@ -137,7 +137,7 @@ public final class OpenTypeSupport {
          } else {
             rc.put(CompositeDataConstants.USER_ID, "");
          }
-         rc.put(CompositeDataConstants.ADDRESS, m.getAddress().toString());
+         rc.put(CompositeDataConstants.ADDRESS, m.getAddress() == null ? "" : m.getAddress().toString());
          rc.put(CompositeDataConstants.TYPE, m.getType());
          rc.put(CompositeDataConstants.DURABLE, m.isDurable());
          rc.put(CompositeDataConstants.EXPIRATION, m.getExpiration());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/16dfd777/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 794b002..e3c4856 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -25,6 +25,7 @@ import javax.jms.Session;
 import java.io.EOFException;
 import java.lang.reflect.Field;
 import java.net.ProtocolException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -48,6 +49,11 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.MQTTException;
@@ -71,6 +77,8 @@ public class MQTTTest extends MQTTTestSupport {
 
    private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
 
+   private static final String AMQP_URI = "tcp://localhost:61616";
+
    @Override
    @Before
    public void setUp() throws Exception {
@@ -1162,6 +1170,35 @@ public class MQTTTest extends MQTTTestSupport {
       doTestSendMQTTReceiveJMS("foo.*", "foo/bar");
    }
 
+   @Test(timeout = 60 * 1000)
+   public void testLinkRouteAmqpReceiveMQTT() throws Exception {
+      AmqpClient client = new AmqpClient(new URI(AMQP_URI), null, null);
+      AmqpConnection connection = client.connect();
+
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender("test", true);
+         AmqpMessage message = new AmqpMessage();
+         message.setText("Test-Message");
+         sender.send(message);
+         sender.close();
+      } finally {
+         connection.close();
+      }
+
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId("TestClient");
+      BlockingConnection blockingConnection = mqtt.blockingConnection();
+      try {
+         blockingConnection.connect();
+         Topic t = new Topic("test", QoS.AT_LEAST_ONCE);
+         blockingConnection.subscribe(new Topic[] {t});
+         assertNotNull(blockingConnection.receive(5, TimeUnit.SECONDS));
+      } finally {
+         blockingConnection.kill();
+      }
+   }
+
    public void doTestSendMQTTReceiveJMS(String jmsTopicAddress, String mqttAddress) throws
Exception {
       final MQTTClientProvider provider = getMQTTClientProvider();
       initializeConnection(provider);


Mime
View raw message