activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6060 - retain will message
Date Tue, 24 Nov 2015 13:13:44 GMT
Repository: activemq
Updated Branches:
  refs/heads/master d9e22a936 -> 9bc28214d


https://issues.apache.org/jira/browse/AMQ-6060 - retain will message


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9bc28214
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9bc28214
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9bc28214

Branch: refs/heads/master
Commit: 9bc28214d53351a6fa6cf459383d5e449a100d2d
Parents: d9e22a9
Author: Dejan Bosanac <dejan@nighttale.net>
Authored: Tue Nov 24 14:13:17 2015 +0100
Committer: Dejan Bosanac <dejan@nighttale.net>
Committed: Tue Nov 24 14:13:32 2015 +0100

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   |   1 +
 .../activemq/transport/mqtt/MQTTWillTest.java   | 122 +++++++++++++++++++
 2 files changed, 123 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/9bc28214/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 5998938..2b384e1 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -623,6 +623,7 @@ public class MQTTProtocolConverter {
                         publish.qos(connect.willQos());
                         publish.messageId(packetIdGenerator.getNextSequenceId(getClientId()));
                         publish.payload(connect.willMessage());
+                        publish.retain(connect.willRetain());
                         ActiveMQMessage message = convertMessage(publish);
                         message.setProducerId(producerId);
                         message.onSend();

http://git-wip-us.apache.org/repos/asf/activemq/blob/9bc28214/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTWillTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTWillTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTWillTest.java
new file mode 100644
index 0000000..0c81c5a
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTWillTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import org.fusesource.mqtt.client.*;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class MQTTWillTest extends MQTTTestSupport {
+
+    @Test(timeout = 60 * 1000)
+    public void testWillMessage() throws Exception {
+        BlockingConnection conn1 = null;
+        BlockingConnection conn2 = null;
+        try {
+            MQTT mqtt1 = createMQTTConnection("client1", false);
+            mqtt1.setWillMessage("last will");
+            mqtt1.setWillQos(QoS.AT_LEAST_ONCE);
+            mqtt1.setWillTopic("wills");
+
+            conn1 = mqtt1.blockingConnection();
+            conn1.connect();
+
+            MQTT mqtt2 = createMQTTConnection("client2", false);
+            conn2 = mqtt2.blockingConnection();
+            conn2.connect();
+            conn2.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)});
+
+            conn1.publish("test", "hello world".getBytes(), QoS.AT_LEAST_ONCE, false);
+
+            Message msg = conn2.receive(5, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            assertEquals("hello world", new String(msg.getPayload()));
+            assertEquals("test", msg.getTopic());
+
+            conn1.kill();
+
+            msg = conn2.receive(5, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            assertEquals("last will", new String(msg.getPayload()));
+            assertEquals("wills", msg.getTopic());
+        } finally {
+           if (conn1 != null) {
+               conn1.disconnect();
+           }
+           if (conn2 != null) {
+               conn2.disconnect();
+           }
+        }
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testRetainWillMessage() throws Exception {
+        BlockingConnection conn1 = null;
+        BlockingConnection conn2 = null;
+        try {
+            MQTT mqtt1 = createMQTTConnection("client1", false);
+            mqtt1.setWillMessage("last will");
+            mqtt1.setWillQos(QoS.AT_LEAST_ONCE);
+            mqtt1.setWillTopic("wills");
+            mqtt1.setWillRetain(true);
+
+            conn1 = mqtt1.blockingConnection();
+            conn1.connect();
+
+            MQTT mqtt2 = createMQTTConnection("client2", false);
+            conn2 = mqtt2.blockingConnection();
+            conn2.connect();
+            conn2.subscribe(new Topic[]{new Topic("#", QoS.AT_MOST_ONCE)});
+
+            conn1.publish("test", "hello world".getBytes(), QoS.AT_LEAST_ONCE, false);
+
+            Message msg = conn2.receive(5, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            assertEquals("hello world", new String(msg.getPayload()));
+            assertEquals("test", msg.getTopic());
+            msg.ack();
+
+            conn2.disconnect();
+
+            Thread.sleep(1000);
+            conn1.kill();
+            Thread.sleep(1000);
+
+            conn2 = mqtt2.blockingConnection();
+            conn2.connect();
+            conn2.subscribe(new Topic[]{new Topic("#", QoS.AT_MOST_ONCE)});
+
+            msg = conn2.receive(5, TimeUnit.SECONDS);
+            System.out.println(msg.getTopic() + " " + new String(msg.getPayload()));
+            assertNotNull(msg);
+            assertEquals("last will", new String(msg.getPayload()));
+            assertEquals("wills", msg.getTopic());
+        } finally {
+           if (conn1 != null) {
+               conn1.disconnect();
+           }
+           if (conn2 != null) {
+               conn2.disconnect();
+           }
+        }
+    }
+
+}


Mime
View raw message