activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [3/4] activemq-artemis git commit: ARTEMIS-960 Do not encode Will messages
Date Tue, 14 Feb 2017 17:27:11 GMT
ARTEMIS-960 Do not encode Will messages


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/59773c28
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/59773c28
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/59773c28

Branch: refs/heads/master
Commit: 59773c284983f7919c044c54f1a6e2039023701f
Parents: 1da69fa
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Mon Feb 13 16:21:57 2017 +0000
Committer: Justin Bertram <jbertram@apache.org>
Committed: Tue Feb 14 11:26:54 2017 -0600

----------------------------------------------------------------------
 .../protocol/mqtt/MQTTConnectionManager.java    |  8 ++-
 .../artemis/core/protocol/mqtt/MQTTUtil.java    | 11 -----
 .../integration/mqtt/imported/MQTTTest.java     | 52 ++++++++++++++++++++
 3 files changed, 59 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/59773c28/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index c623f3b..062eb41 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -17,9 +17,12 @@
 
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
+import java.nio.charset.Charset;
 import java.util.Set;
 import java.util.UUID;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -75,7 +78,10 @@ public class MQTTConnectionManager {
       session.setIsClean(cleanSession);
 
       if (will) {
-         ServerMessage w = MQTTUtil.createServerMessageFromString(session, willMessage, willTopic,
willQosLevel, willRetain);
+         byte[] payload = willMessage.getBytes(Charset.forName("UTF-8"));
+         ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(payload.length);
+         buf.writeBytes(payload);
+         ServerMessage w = MQTTUtil.createServerMessageFromByteBuf(session, willTopic, willRetain,
willQosLevel, buf);
          session.getSessionState().setWillMessage(w);
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/59773c28/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index bdc16b1..4819006 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -115,7 +115,6 @@ public class MQTTUtil {
       message.setAddress(address);
       message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain);
       message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
-      // For JMS Consumption
       message.setType(Message.BYTES_TYPE);
       return message;
    }
@@ -133,16 +132,6 @@ public class MQTTUtil {
       return message;
    }
 
-   public static ServerMessage createServerMessageFromString(MQTTSession session,
-                                                             String payload,
-                                                             String topic,
-                                                             int qos,
-                                                             boolean retain) {
-      ServerMessage message = createServerMessage(session, new SimpleString(topic), retain,
qos);
-      message.getBodyBuffer().writeString(payload);
-      return message;
-   }
-
    public static ServerMessage createPubRelMessage(MQTTSession session, SimpleString address,
int messageId) {
       ServerMessage message = createServerMessage(session, address, false, 1);
       message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/59773c28/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index d5050da..d359f2e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.MQTTException;
 import org.fusesource.mqtt.client.Message;
 import org.fusesource.mqtt.client.QoS;
 import org.fusesource.mqtt.client.Topic;
@@ -998,6 +999,39 @@ public class MQTTTest extends MQTTTestSupport {
    }
 
    @Test(timeout = 60 * 1000)
+   public void testClientConnectionFailureSendsWillMessage() throws Exception {
+      getServer().createQueue(SimpleString.toSimpleString("will"), RoutingType.MULTICAST,
SimpleString.toSimpleString("will"), null, true, false);
+
+      MQTT mqtt = createMQTTConnection("1", false);
+      mqtt.setKeepAlive((short) 1);
+      mqtt.setWillMessage("test message");
+      mqtt.setWillTopic("will");
+      mqtt.setWillQos(QoS.AT_LEAST_ONCE);
+
+      final BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+      Wait.waitFor(new Wait.Condition() {
+         @Override
+         public boolean isSatisfied() throws Exception {
+            return connection.isConnected();
+         }
+      });
+
+      MQTT mqtt2 = createMQTTConnection("2", false);
+      BlockingConnection connection2 = mqtt2.blockingConnection();
+      connection2.connect();
+      connection2.subscribe(new Topic[]{new Topic("will", QoS.AT_LEAST_ONCE)});
+
+      // kill transport
+      connection.kill();
+
+      // FIXME Wait for the previous connection to timeout.  This is not required in ActiveMQ.
 Needs investigating.
+      Thread.sleep(10000);
+      Message m = connection2.receive(1000, TimeUnit.MILLISECONDS);
+      assertEquals("test message", new String(m.getPayload()));
+   }
+
+   @Test(timeout = 60 * 1000)
    public void testCleanSession() throws Exception {
       final String CLIENTID = "cleansession";
       final MQTT mqttNotClean = createMQTTConnection(CLIENTID, false);
@@ -1779,4 +1813,22 @@ public class MQTTTest extends MQTTTestSupport {
       Message message = connection2.receive();
       assertEquals(payload, new String(message.getPayload()));
    }
+
+   @Test
+   public void testDuplicateIDReturnsError() throws Exception {
+      String clientId = "clientId";
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId(clientId);
+      mqtt.blockingConnection().connect();
+
+      MQTTException e = null;
+      try {
+         MQTT mqtt2 = createMQTTConnection();
+         mqtt2.setClientId(clientId);
+         mqtt2.blockingConnection().connect();
+      } catch (MQTTException mqttE) {
+         e = mqttE;
+      }
+      assertTrue(e.getMessage().contains("CONNECTION_REFUSED_IDENTIFIER_REJECTED"));
+   }
 }


Mime
View raw message