Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4C59A186E9 for ; Thu, 4 Jun 2015 19:15:16 +0000 (UTC) Received: (qmail 15919 invoked by uid 500); 4 Jun 2015 19:15:16 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 15875 invoked by uid 500); 4 Jun 2015 19:15:16 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 15866 invoked by uid 99); 4 Jun 2015 19:15:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Jun 2015 19:15:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1BC8FE10A7; Thu, 4 Jun 2015 19:15:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-5303 Date: Thu, 4 Jun 2015 19:15:16 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/master 928e815a0 -> 47e954d0f https://issues.apache.org/jira/browse/AMQ-5303 Fix for subscription recovery of durable topic subscriptions using default subscription strategy and subscribing to a VirtualTopic instance. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/47e954d0 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/47e954d0 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/47e954d0 Branch: refs/heads/master Commit: 47e954d0f6409da418edeb4c53597e09ba03fe43 Parents: 928e815 Author: Timothy Bish Authored: Thu Jun 4 15:14:37 2015 -0400 Committer: Timothy Bish Committed: Thu Jun 4 15:14:52 2015 -0400 ---------------------------------------------------------------------- .../MQTTDefaultSubscriptionStrategy.java | 2 +- .../activemq/transport/mqtt/MQTTTest.java | 80 +++----------------- 2 files changed, 12 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/47e954d0/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java index 1b3ac5d..68d6cb9 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java @@ -152,7 +152,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr QoS qoS = QoS.valueOf(split[0]); onSubscribe(new Topic(split[1], qoS)); // mark this durable subscription as restored by Broker - restoredSubs.add(split[1]); + restoredSubs.add(MQTTProtocolSupport.convertMQTTToActiveMQ(split[1])); } } catch (IOException e) { LOG.warn("Could not restore the MQTT durable subs.", e); http://git-wip-us.apache.org/repos/asf/activemq/blob/47e954d0/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 360ed64..7dbf9c7 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -64,7 +64,6 @@ import org.fusesource.mqtt.client.Topic; import org.fusesource.mqtt.client.Tracer; import org.fusesource.mqtt.codec.MQTTFrame; import org.fusesource.mqtt.codec.PUBLISH; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -510,87 +509,28 @@ public class MQTTTest extends MQTTTestSupport { @Test(timeout = 120 * 1000) public void testRetainedMessage() throws Exception { - MQTT mqtt = createMQTTConnection(); - mqtt.setKeepAlive((short) 60); - - final String RETAIN = "RETAIN"; - final String TOPICA = "TopicA"; - - final String[] clientIds = { null, "foo", "durable" }; - for (String clientId : clientIds) { - LOG.info("Testing now with Client ID: {}", clientId); - - mqtt.setClientId(clientId); - mqtt.setCleanSession(!"durable".equals(clientId)); - - BlockingConnection connection = mqtt.blockingConnection(); - connection.connect(); - - // set retained message and check - connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true); - connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); - Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); - assertNotNull("No retained message for " + clientId, msg); - assertEquals(RETAIN, new String(msg.getPayload())); - msg.ack(); - assertNull(connection.receive(500, TimeUnit.MILLISECONDS)); - - // test duplicate subscription - connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); - msg = connection.receive(15000, TimeUnit.MILLISECONDS); - assertNotNull("No retained message on duplicate subscription for " + clientId, msg); - assertEquals(RETAIN, new String(msg.getPayload())); - msg.ack(); - assertNull(connection.receive(500, TimeUnit.MILLISECONDS)); - connection.unsubscribe(new String[]{TOPICA}); - - // clear retained message and check that we don't receive it - connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true); - connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); - msg = connection.receive(500, TimeUnit.MILLISECONDS); - assertNull("Retained message not cleared for " + clientId, msg); - connection.unsubscribe(new String[]{TOPICA}); - - // set retained message again and check - connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true); - connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); - msg = connection.receive(5000, TimeUnit.MILLISECONDS); - assertNotNull("No reset retained message for " + clientId, msg); - assertEquals(RETAIN, new String(msg.getPayload())); - msg.ack(); - assertNull(connection.receive(500, TimeUnit.MILLISECONDS)); - - // re-connect and check - connection.disconnect(); - connection = mqtt.blockingConnection(); - connection.connect(); - connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); - msg = connection.receive(5000, TimeUnit.MILLISECONDS); - assertNotNull("No reset retained message for " + clientId, msg); - assertEquals(RETAIN, new String(msg.getPayload())); - msg.ack(); - assertNull(connection.receive(500, TimeUnit.MILLISECONDS)); - - connection.unsubscribe(new String[]{TOPICA}); - connection.disconnect(); - } + doTestRetainedMessages("TopicA"); } - @Ignore @Test(timeout = 120 * 1000) public void testRetainedMessageOnVirtualTopics() throws Exception { + doTestRetainedMessages("VirtualTopic/TopicA"); + } + + public void doTestRetainedMessages(String topicName) throws Exception { MQTT mqtt = createMQTTConnection(); mqtt.setKeepAlive((short) 60); final String RETAIN = "RETAIN"; - final String TOPICA = "VirtualTopic/TopicA"; + final String TOPICA = topicName; final String[] clientIds = { null, "foo", "durable" }; for (String clientId : clientIds) { - LOG.info("Testing now with Client ID: {}", clientId); + boolean cleanSession = !"durable".equals(clientId); + LOG.info("Testing now with Client ID: {} clean: {}", clientId, cleanSession); mqtt.setClientId(clientId); - mqtt.setCleanSession(!"durable".equals(clientId)); + mqtt.setCleanSession(cleanSession); BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); @@ -622,6 +562,7 @@ public class MQTTTest extends MQTTTestSupport { // set retained message again and check connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true); + LOG.info("Performing first subscription"); connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); msg = connection.receive(5000, TimeUnit.MILLISECONDS); assertNotNull("No reset retained message for " + clientId, msg); @@ -633,6 +574,7 @@ public class MQTTTest extends MQTTTestSupport { connection.disconnect(); connection = mqtt.blockingConnection(); connection.connect(); + LOG.info("Performing second subscription"); connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); msg = connection.receive(5000, TimeUnit.MILLISECONDS); assertNotNull("No reset retained message for " + clientId, msg);