activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5828
Date Fri, 05 Jun 2015 15:21:46 GMT
Repository: activemq
Updated Branches:
  refs/heads/master a095e9b9d -> 11da37b99


https://issues.apache.org/jira/browse/AMQ-5828

Fixed bad default for message persistence that breaks AMQP specification
defined behavior when the durable value is not present in the Header.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/11da37b9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/11da37b9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/11da37b9

Branch: refs/heads/master
Commit: 11da37b991d9ff6e7419a0ee0c8929a7a8dabced
Parents: a095e9b
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Jun 5 11:21:27 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Jun 5 11:21:27 2015 -0400

----------------------------------------------------------------------
 .../amqp/message/AMQPRawInboundTransformer.java |  5 ++-
 .../amqp/message/InboundTransformer.java        |  2 +-
 .../transport/amqp/AmqpTransformerTest.java     | 10 ++---
 .../transport/amqp/client/AmqpMessage.java      | 33 ++++++++++++++-
 .../amqp/interop/AmqpSendReceiveTest.java       | 43 ++++++++++++++++++++
 5 files changed, 85 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/11da37b9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
index d60a96b..e1414df 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.amqp.message;
 
 import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
 import javax.jms.Message;
 
 public class AMQPRawInboundTransformer extends InboundTransformer {
@@ -40,7 +41,9 @@ public class AMQPRawInboundTransformer extends InboundTransformer {
         BytesMessage rc = vendor.createBytesMessage();
         rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
 
-        rc.setJMSDeliveryMode(defaultDeliveryMode);
+        // We cannot decode the message headers to check so err on the side of caution
+        // and mark all messages as persistent.
+        rc.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
         rc.setJMSPriority(defaultPriority);
 
         final long now = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/activemq/blob/11da37b9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
index 1310bcd..26d5753 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
@@ -51,7 +51,7 @@ public abstract class InboundTransformer {
     String prefixMessageAnnotations = "MA_";
     String prefixFooter = "FT_";
 
-    int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
+    int defaultDeliveryMode = javax.jms.DeliveryMode.NON_PERSISTENT;
     int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
     long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/11da37b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
index 0c2c6f7..b513c1a 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
@@ -25,6 +25,7 @@ import java.net.URI;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
+import javax.jms.DeliveryMode;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -89,7 +90,7 @@ public class AmqpTransformerTest {
         Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT");
         assertEquals(0L, messageFormat.longValue());
         assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed);
-        assertEquals(2, message.getJMSDeliveryMode());
+        assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
         assertEquals(7, message.getJMSPriority());
 
         c.close();
@@ -138,10 +139,9 @@ public class AmqpTransformerTest {
         Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT");
         assertEquals(0L, messageFormat.longValue());
         assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed);
-        assertEquals(2, message.getJMSDeliveryMode());
+        assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
 
-        // should not equal 7 (should equal the default) because "raw" does not map
-        // headers
+        // should not equal 7 (should equal the default) because "raw" does not map headers
         assertEquals(4, message.getJMSPriority());
 
         c.close();
@@ -187,7 +187,7 @@ public class AmqpTransformerTest {
         Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT");
         assertEquals(0L, messageFormat.longValue());
         assertFalse("Didn't use the correct transformation, expected NOT to be NATIVE", nativeTransformationUsed);
-        assertEquals(2, message.getJMSDeliveryMode());
+        assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
 
         c.close();
         session.close();

http://git-wip-us.apache.org/repos/asf/activemq/blob/11da37b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index d29d620..0acd1c6 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -29,6 +29,7 @@ import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
 import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.engine.Delivery;
@@ -53,7 +54,6 @@ public class AmqpMessage {
         delivery = null;
 
         message = Proton.message();
-        message.setDurable(true);
     }
 
     /**
@@ -247,6 +247,32 @@ public class AmqpMessage {
     }
 
     /**
+     * Sets the durable header on the outgoing message.
+     *
+     * @param durable
+     *        the boolean durable value to set.
+     */
+    public void setDurable(boolean durable) {
+        checkReadOnly();
+        lazyCreateHeader();
+        getWrappedMessage().setDurable(durable);
+    }
+
+    /**
+     * Checks the durable value in the Message Headers to determine if
+     * the message was sent as a durable Message.
+     *
+     * @return true if the message is marked as being durable.
+     */
+    public boolean isDurable() {
+        if (message.getHeader() == null) {
+            return false;
+        }
+
+        return message.getHeader().getDurable();
+    }
+
+    /**
      * Sets a given application property on an outbound message.
      *
      * @param key
@@ -448,6 +474,11 @@ public class AmqpMessage {
         }
     }
 
+    private void lazyCreateHeader() {
+        if (message.getHeader() == null) {
+            message.setHeader(new Header());
+        }
+    }
     private void lazyCreateProperties() {
         if (message.getProperties() == null) {
             message.setProperties(new Properties());

http://git-wip-us.apache.org/repos/asf/activemq/blob/11da37b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index 822edee..29ff954 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -17,7 +17,9 @@
 package org.apache.activemq.transport.amqp.interop;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 import java.util.concurrent.TimeUnit;
 
@@ -154,4 +156,45 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
         receiver1.close();
         receiver2.close();
     }
+
+    @Test(timeout = 60000)
+    public void testMessageDurabliltyFollowsSpec() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
+
+        QueueViewMBean queue = getProxyToQueue(getTestName());
+
+        // Create default message that should be sent as non-durable
+        AmqpMessage message1 = new AmqpMessage();
+        message1.setText("Test-Message -> non-durable");
+        message1.setDurable(false);
+        message1.setMessageId("ID:Message:1");
+        sender.send(message1);
+
+        assertEquals(1, queue.getQueueSize());
+        receiver1.flow(1);
+        message1 = receiver1.receive(50, TimeUnit.SECONDS);
+        assertFalse("First message sent should not be durable", message1.isDurable());
+        message1.accept();
+
+        // Create default message that should be sent as non-durable
+        AmqpMessage message2 = new AmqpMessage();
+        message2.setText("Test-Message -> durable");
+        message2.setDurable(true);
+        message2.setMessageId("ID:Message:2");
+        sender.send(message2);
+
+        assertEquals(1, queue.getQueueSize());
+        receiver1.flow(1);
+        message2 = receiver1.receive(50, TimeUnit.SECONDS);
+        assertTrue("Second message sent should be durable", message2.isDurable());
+        message2.accept();
+
+        sender.close();
+        connection.close();
+    }
 }


Mime
View raw message