activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-917 Only return body of retained message after reboot
Date Mon, 06 Feb 2017 20:00:49 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 36aef22d0 -> ca6beb16c


ARTEMIS-917 Only return body of retained message after reboot


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3900cb0e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3900cb0e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3900cb0e

Branch: refs/heads/master
Commit: 3900cb0ec7305ee1f341687ace6a4fab7469a817
Parents: 36aef22
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Sun Feb 5 19:20:32 2017 +0000
Committer: Justin Bertram <jbertram@apache.org>
Committed: Mon Feb 6 13:33:26 2017 -0600

----------------------------------------------------------------------
 .../artemis/core/message/impl/MessageImpl.java  |  2 ++
 .../core/protocol/mqtt/MQTTPublishManager.java  |  4 ++-
 .../integration/mqtt/imported/MQTTTest.java     | 32 ++++++++++++++++++++
 3 files changed, 37 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3900cb0e/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
index 1e19817..69e6e65 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
@@ -162,8 +162,10 @@ public abstract class MessageImpl implements MessageInternal {
             buffer.setIndex(other.buffer.readerIndex(), buffer.capacity());
 
             bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this);
+
             bodyBuffer.readerIndex(BODY_OFFSET);
             bodyBuffer.writerIndex(other.getBodyBuffer().writerIndex());
+            endOfBodyPosition = other.endOfBodyPosition;
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3900cb0e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index c266e76..e6b3345 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -22,6 +22,7 @@ import java.io.UnsupportedEncodingException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.EmptyByteBuf;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
@@ -227,7 +228,8 @@ public class MQTTPublishManager {
                log.warn("Unable to send message: " + message.getMessageID() + " Cause: "
+ e.getMessage());
             }
          default:
-            payload = message.getBodyBufferDuplicate().byteBuf();
+            ActiveMQBuffer bufferDup = message.getBodyBufferDuplicate();
+            payload = bufferDup.readBytes(message.getEndOfBodyPosition() - bufferDup.readerIndex()).byteBuf();
             break;
       }
       session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3900cb0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 39c950a..333ddd3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -1746,4 +1746,36 @@ public class MQTTTest extends MQTTTestSupport {
       assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS));
       assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS));
    }
+
+   @Test
+   public void testRetainedMessagesAreCorrectlyFormedAfterRestart() throws Exception {
+      String clientId = "testMqtt";
+      String address = "testAddress";
+      String payload = "This is a test message";
+
+      // Create address
+      getServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.MULTICAST));
+
+      // Send MQTT Retain Message
+      Topic[] mqttTopic = new Topic[]{new Topic(address, QoS.AT_LEAST_ONCE)};
+
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId(clientId);
+      BlockingConnection connection1 = mqtt.blockingConnection();
+      connection1.connect();
+      connection1.publish(address, payload.getBytes(), QoS.AT_LEAST_ONCE, true);
+
+      getServer().stop(false);
+      getServer().start();
+      waitForServerToStart(getServer());
+
+      MQTT mqtt2 = createMQTTConnection();
+      mqtt2.setClientId(clientId + "2");
+      BlockingConnection connection2 = mqtt2.blockingConnection();
+      connection2.connect();
+      connection2.subscribe(mqttTopic);
+
+      Message message = connection2.receive();
+      assertEquals(payload, new String(message.getPayload()));
+   }
 }


Mime
View raw message