activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [2/3] activemq-artemis git commit: ARTEMIS-917 Only return body of retained message after reboot
Date Fri, 10 Feb 2017 15:31:18 GMT
ARTEMIS-917 Only return body of retained message after reboot

(cherry picked from commit 3900cb0ec7305ee1f341687ace6a4fab7469a817)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f231fe4e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f231fe4e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f231fe4e

Branch: refs/heads/1.x
Commit: f231fe4e9b4d0720c3bac19d097c1bde17badd46
Parents: d0b568c
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Sun Feb 5 19:20:32 2017 +0000
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Fri Feb 10 14:55:24 2017 +0000

----------------------------------------------------------------------
 .../artemis/core/message/impl/MessageImpl.java  |  2 ++
 .../core/protocol/mqtt/MQTTPublishManager.java  |  4 ++-
 .../integration/mqtt/imported/MQTTTest.java     | 30 ++++++++++++++++++++
 .../mqtt/imported/MQTTTestSupport.java          |  4 +++
 4 files changed, 39 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f231fe4e/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
index 1e19817..69e6e65 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
@@ -162,8 +162,10 @@ public abstract class MessageImpl implements MessageInternal {
             buffer.setIndex(other.buffer.readerIndex(), buffer.capacity());
 
             bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this);
+
             bodyBuffer.readerIndex(BODY_OFFSET);
             bodyBuffer.writerIndex(other.getBodyBuffer().writerIndex());
+            endOfBodyPosition = other.endOfBodyPosition;
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f231fe4e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 73a7c8e..ca926b9 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -22,6 +22,7 @@ import java.io.UnsupportedEncodingException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.EmptyByteBuf;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
@@ -230,7 +231,8 @@ public class MQTTPublishManager {
                log.warn("Unable to send message: " + message.getMessageID() + " Cause: "
+ e.getMessage());
             }
          default:
-            payload = message.getBodyBufferDuplicate().byteBuf();
+            ActiveMQBuffer bufferDup = message.getBodyBufferDuplicate();
+            payload = bufferDup.readBytes(message.getEndOfBodyPosition() - bufferDup.readerIndex()).byteBuf();
             break;
       }
       session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f231fe4e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 1d6b98d..886edd3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -1613,4 +1614,33 @@ public class MQTTTest extends MQTTTestSupport {
 
       connection.disconnect();
    }
+
+   @Test
+   public void testRetainedMessagesAreCorrectlyFormedAfterRestart() throws Exception {
+      String clientId = "testMqtt";
+      String address = "testAddress";
+      String payload = "This is a test message";
+
+      // Send MQTT Retain Message
+      Topic[] mqttTopic = new Topic[]{new Topic(address, QoS.AT_LEAST_ONCE)};
+
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId(clientId);
+      BlockingConnection connection1 = mqtt.blockingConnection();
+      connection1.connect();
+      connection1.publish(address, payload.getBytes(), QoS.AT_LEAST_ONCE, true);
+
+      getServer().stop(false);
+      getServer().start();
+      waitForServerToStart(getServer());
+
+      MQTT mqtt2 = createMQTTConnection();
+      mqtt2.setClientId(clientId + "2");
+      BlockingConnection connection2 = mqtt2.blockingConnection();
+      connection2.connect();
+      connection2.subscribe(mqttTopic);
+
+      Message message = connection2.receive();
+      assertEquals(payload, new String(message.getPayload()));
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f231fe4e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
index 8b85f83..476a562 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -93,6 +93,10 @@ public class MQTTTestSupport extends ActiveMQTestBase {
       return name.getMethodName();
    }
 
+   public ActiveMQServer getServer() {
+      return server;
+   }
+
    @Override
    @Before
    public void setUp() throws Exception {


Mime
View raw message