activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5092 - apply patch from Dhiraj Bokde with thanks
Date Wed, 12 Mar 2014 13:00:55 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 6aaf859d2 -> 67f151fe0


https://issues.apache.org/jira/browse/AMQ-5092 - apply patch from Dhiraj Bokde with thanks


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/67f151fe
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/67f151fe
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/67f151fe

Branch: refs/heads/trunk
Commit: 67f151fe0bca5f69a912fe0e707c528ae8c450b6
Parents: 6aaf859
Author: gtully <gary.tully@gmail.com>
Authored: Wed Mar 12 13:00:38 2014 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Wed Mar 12 13:00:38 2014 +0000

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   |  61 +++++-
 .../transport/mqtt/MQTTSubscription.java        |  10 +-
 .../activemq/transport/mqtt/MQTTTest.java       | 194 ++++++++++++++++++-
 3 files changed, 258 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/67f151fe/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 19614a9..1f912b7 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -57,6 +57,7 @@ public class MQTTProtocolConverter {
     private final SessionId sessionId = new SessionId(connectionId, -1);
     private final ProducerId producerId = new ProducerId(sessionId, 1);
     private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
+    private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator();
     private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
 
     private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new
ConcurrentHashMap<Integer, ResponseHandler>();
@@ -66,6 +67,9 @@ public class MQTTProtocolConverter {
     private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination,
UTF8Buffer>(DEFAULT_CACHE_SIZE);
     private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
     private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
+    private final Map<String, Short> activemqToPacketIds = new LRUCache<String,
Short>(DEFAULT_CACHE_SIZE);
+    private final Map<Short, String> packetIdsToActivemq = new LRUCache<Short, String>(DEFAULT_CACHE_SIZE);
+
     private final MQTTTransport mqttTransport;
     private final BrokerService brokerService;
 
@@ -348,10 +352,15 @@ public class MQTTProtocolConverter {
                             PUBLISH retainedCopy = new PUBLISH();
                             retainedCopy.topicName(msg.topicName());
                             retainedCopy.retain(msg.retain());
-                            retainedCopy.messageId(msg.messageId());
                             retainedCopy.payload(msg.payload());
                             // set QoS of retained message to maximum of subscription QoS
                             retainedCopy.qos(msg.qos().ordinal() > qos[i] ? QoS.values()[qos[i]]
: msg.qos());
+                            switch (retainedCopy.qos()) {
+                                case AT_LEAST_ONCE:
+                                case EXACTLY_ONCE:
+                                    retainedCopy.messageId(getNextSequenceId());
+                                case AT_MOST_ONCE:
+                            }
                             getMQTTTransport().sendToMQTT(retainedCopy.encode());
                         } catch (IOException e) {
                             LOG.warn("Couldn't send retained message " + msg, e);
@@ -446,6 +455,12 @@ public class MQTTProtocolConverter {
             if (sub != null) {
                 MessageAck ack = sub.createMessageAck(md);
                 PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
+                switch (publish.qos()) {
+                    case AT_LEAST_ONCE:
+                    case EXACTLY_ONCE:
+                        publish.dup(publish.dup() ? true : md.getMessage().isRedelivered());
+                    case AT_MOST_ONCE:
+                }
                 if (ack != null && sub.expectAck(publish)) {
                     synchronized (consumerAcks) {
                         consumerAcks.put(publish.messageId(), ack);
@@ -480,6 +495,7 @@ public class MQTTProtocolConverter {
 
     void onMQTTPubAck(PUBACK command) {
         short messageId = command.messageId();
+        ackPacketId(messageId);
         MessageAck ack;
         synchronized (consumerAcks) {
             ack = consumerAcks.remove(messageId);
@@ -511,6 +527,7 @@ public class MQTTProtocolConverter {
 
     void onMQTTPubComp(PUBCOMP command) {
         short messageId = command.messageId();
+        ackPacketId(messageId);
         MessageAck ack;
         synchronized (consumerAcks) {
             ack = consumerAcks.remove(messageId);
@@ -524,7 +541,7 @@ public class MQTTProtocolConverter {
         ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
 
         msg.setProducerId(producerId);
-        MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
+        MessageId id = new MessageId(producerId, publisherIdGenerator.getNextSequenceId());
         msg.setMessageId(id);
         msg.setTimestamp(System.currentTimeMillis());
         msg.setPriority((byte) Message.DEFAULT_PRIORITY);
@@ -547,8 +564,7 @@ public class MQTTProtocolConverter {
 
     public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException,
DataFormatException {
         PUBLISH result = new PUBLISH();
-        short id = (short) message.getMessageId().getProducerSequenceId();
-        result.messageId(id);
+        // packet id is set in MQTTSubscription
         QoS qoS;
         if (message.propertyExists(QOS_PROPERTY_NAME)) {
             int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
@@ -623,7 +639,7 @@ public class MQTTProtocolConverter {
                     PUBLISH publish = new PUBLISH();
                     publish.topicName(connect.willTopic());
                     publish.qos(connect.willQos());
-                    publish.messageId((short) messageIdGenerator.getNextSequenceId());
+                    publish.messageId(getNextSequenceId());
                     publish.payload(connect.willMessage());
                     ActiveMQMessage message = convertMessage(publish);
                     message.setProducerId(producerId);
@@ -815,4 +831,39 @@ public class MQTTProtocolConverter {
     public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
         this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch;
     }
+
+    short setPacketId(MQTTSubscription subscription, ActiveMQMessage message, PUBLISH publish)
{
+        // subscription key
+        final StringBuilder subscriptionKey = new StringBuilder();
+        subscriptionKey.append(subscription.getConsumerInfo().getDestination().getPhysicalName())
+            .append(':').append(message.getJMSMessageID());
+        final String keyStr = subscriptionKey.toString();
+        Short packetId;
+        synchronized (activemqToPacketIds) {
+            packetId = activemqToPacketIds.get(keyStr);
+            if (packetId == null) {
+                packetId = getNextSequenceId();
+                activemqToPacketIds.put(keyStr, packetId);
+                packetIdsToActivemq.put(packetId, keyStr);
+            } else {
+                // mark publish as duplicate!
+                publish.dup(true);
+            }
+        }
+        publish.messageId(packetId);
+        return packetId;
+    }
+
+    void ackPacketId(short packetId) {
+        synchronized (activemqToPacketIds) {
+            final String subscriptionKey = packetIdsToActivemq.remove(packetId);
+            if (subscriptionKey != null) {
+                activemqToPacketIds.remove(subscriptionKey);
+            }
+        }
+    }
+
+    short getNextSequenceId() {
+        return (short) messageIdGenerator.getNextSequenceId();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/67f151fe/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
index 99af92a..0eed8f6 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
@@ -18,8 +18,8 @@ package org.apache.activemq.transport.mqtt;
 
 import java.io.IOException;
 import java.util.zip.DataFormatException;
-
 import javax.jms.JMSException;
+
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ConsumerInfo;
@@ -53,6 +53,13 @@ class MQTTSubscription {
         if (publish.qos().ordinal() > this.qos.ordinal()) {
             publish.qos(this.qos);
         }
+        switch (publish.qos()) {
+            case AT_LEAST_ONCE:
+            case EXACTLY_ONCE:
+                // set packet id, and optionally dup flag
+                protocolConverter.setPacketId(this, message, publish);
+            case AT_MOST_ONCE:
+        }
         return publish;
     }
 
@@ -71,4 +78,5 @@ class MQTTSubscription {
     public QoS qos() {
         return qos;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/67f151fe/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 9ece80e..76d1597 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.mqtt;
 
+import java.net.ProtocolException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -30,12 +31,14 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotEquals;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.Wait;
+import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.Message;
@@ -512,6 +515,195 @@ public class MQTTTest extends AbstractMQTTTest {
 
     }
 
+    @Test(timeout = 60 * 1000)
+    public void testUniqueMessageIds() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("foo");
+        mqtt.setKeepAlive((short)2);
+        mqtt.setCleanSession(true);
+
+        final List<PUBLISH> publishList = new ArrayList<PUBLISH>();
+        mqtt.setTracer(new Tracer() {
+            @Override
+            public void onReceive(MQTTFrame frame) {
+                LOG.info("Client received:\n" + frame);
+                if (frame.messageType() == PUBLISH.TYPE) {
+                    PUBLISH publish = new PUBLISH();
+                    try {
+                        // copy the buffers before we decode
+                        Buffer[] buffers = frame.buffers();
+                        Buffer[] copy = new Buffer[buffers.length];
+                        for (int i = 0; i < buffers.length; i++) {
+                            copy[i] = buffers[i].deepCopy();
+                        }
+                        publish.decode(frame);
+                        // reset frame buffers to deep copy
+                        frame.buffers(copy);
+                    } catch (ProtocolException e) {
+                        fail("Error decoding publish " + e.getMessage());
+                    }
+                    publishList.add(publish);
+                }
+            }
+
+            @Override
+            public void onSend(MQTTFrame frame) {
+                LOG.info("Client sent:\n" + frame);
+            }
+        });
+
+        final BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        // create overlapping subscriptions with different QoSs
+        QoS[] qoss = { QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE };
+        final String TOPIC = "TopicA/";
+
+        // publish retained message
+        connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, true);
+
+        String[] subs = {TOPIC, "TopicA/#", "TopicA/+"};
+        for (int i = 0; i < qoss.length; i++) {
+            connection.subscribe(new Topic[]{ new Topic(subs[i], qoss[i]) });
+        }
+
+        // publish non-retained message
+        connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+        int received = 0;
+
+        Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+        do {
+            assertNotNull(msg);
+            assertEquals(TOPIC, new String(msg.getPayload()));
+            msg.ack();
+            int waitCount = 0;
+            while (publishList.size() <= received && waitCount < 10) {
+                Thread.sleep(1000);
+                waitCount++;
+            }
+            msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+        } while (msg != null && received++ < subs.length * 2);
+        assertEquals("Unexpected number of messages", subs.length * 2, received + 1);
+
+        // make sure we received distinct ids for QoS != AT_MOST_ONCE, and 0 for AT_MOST_ONCE
+        for (int i = 0; i < publishList.size(); i++) {
+            for (int j = i + 1; j < publishList.size(); j++) {
+                final PUBLISH publish1 = publishList.get(i);
+                final PUBLISH publish2 = publishList.get(j);
+                boolean qos0 = false;
+                if (publish1.qos() == QoS.AT_MOST_ONCE) {
+                    qos0 = true;
+                    assertEquals(0, publish1.messageId());
+                }
+                if (publish2.qos() == QoS.AT_MOST_ONCE) {
+                    qos0 = true;
+                    assertEquals(0, publish2.messageId());
+                }
+                if (!qos0) {
+                    assertNotEquals(publish1.messageId(), publish2.messageId());
+                }
+            }
+        }
+
+        connection.unsubscribe(subs);
+        connection.disconnect();
+    }
+
+    @Test(timeout = 600 * 1000)
+    public void testResendMessageId() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("foo");
+        mqtt.setKeepAlive((short)2);
+        mqtt.setCleanSession(true);
+
+        final List<PUBLISH> publishList = new ArrayList<PUBLISH>();
+        mqtt.setTracer(new Tracer() {
+            @Override
+            public void onReceive(MQTTFrame frame) {
+                LOG.info("Client received:\n" + frame);
+                if (frame.messageType() == PUBLISH.TYPE) {
+                    PUBLISH publish = new PUBLISH();
+                    try {
+                        // copy the buffers before we decode
+                        Buffer[] buffers = frame.buffers();
+                        Buffer[] copy = new Buffer[buffers.length];
+                        for (int i = 0; i < buffers.length; i++) {
+                            copy[i] = buffers[i].deepCopy();
+                        }
+                        publish.decode(frame);
+                        // reset frame buffers to deep copy
+                        frame.buffers(copy);
+                    } catch (ProtocolException e) {
+                        fail("Error decoding publish " + e.getMessage());
+                    }
+                    publishList.add(publish);
+                }
+            }
+
+            @Override
+            public void onSend(MQTTFrame frame) {
+                LOG.info("Client sent:\n" + frame);
+            }
+        });
+
+        final BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        // create overlapping subscriptions with different QoSs
+        final String TOPIC = "TopicA/";
+        final String[] subs = { TOPIC, "+/"};
+        connection.subscribe(new Topic[]{new Topic(subs[0], QoS.AT_LEAST_ONCE), new Topic(subs[1],
QoS.EXACTLY_ONCE)});
+
+        // publish non-retained message
+        connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+
+        Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+        assertNotNull(msg);
+        assertEquals(TOPIC, new String(msg.getPayload()));
+        msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+        assertNotNull(msg);
+        assertEquals(TOPIC, new String(msg.getPayload()));
+
+        // drop subs without acknowledging messages, then subscribe and receive again
+        connection.unsubscribe(subs);
+        connection.subscribe(new Topic[]{new Topic(subs[0], QoS.AT_LEAST_ONCE), new Topic(subs[1],
QoS.EXACTLY_ONCE)});
+        // wait for all acks to be processed
+        Thread.sleep(1000);
+
+        msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+        assertNotNull(msg);
+        assertEquals(TOPIC, new String(msg.getPayload()));
+        msg.ack();
+        msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+        assertNotNull(msg);
+        assertEquals(TOPIC, new String(msg.getPayload()));
+        msg.ack();
+
+        // make sure we received duplicate message ids
+        for (int i = 0; i < publishList.size(); i++) {
+            boolean found = false;
+            for (int j = 0; j < publishList.size(); j++) {
+                if (i != j) {
+                    if (publishList.get(i).messageId() == publishList.get(j).messageId())
{
+                        // one of them is a duplicate
+                        assertTrue(publishList.get(i).dup() || publishList.get(j).dup());
+                        found = true;
+                    }
+                }
+            }
+            assertTrue("Dup Not found " + publishList.get(i), found);
+        }
+
+        connection.unsubscribe(subs);
+        connection.disconnect();
+    }
+
     @Test(timeout=60 * 1000)
     public void testSendMQTTReceiveJMS() throws Exception {
         addMQTTConnector();
@@ -691,7 +883,7 @@ public class MQTTTest extends AbstractMQTTTest {
             payload = message.getPayload();
             String messageContent = new String(payload);
             LOG.info("Received message from topic: " + message.getTopic() +
-                    " Message content: " + messageContent);
+                " Message content: " + messageContent);
             message.ack();
         }
 


Mime
View raw message