activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6037
Date Fri, 13 Nov 2015 16:48:17 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.12.x e0e737743 -> 3faf87ba8


https://issues.apache.org/jira/browse/AMQ-6037

Add support for amqp style variants of the ActiveMQ job scheduler
options set in message annotations.
(cherry picked from commit 5d353e241b0ba76ad1a0b42ef5c7a2ae54106860)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3faf87ba
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3faf87ba
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3faf87ba

Branch: refs/heads/activemq-5.12.x
Commit: 3faf87ba8a93b1db72dc06c0810140ab6d748b3d
Parents: e0e7377
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Nov 10 18:12:40 2015 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Nov 13 11:48:12 2015 -0500

----------------------------------------------------------------------
 .../amqp/message/InboundTransformer.java        | 23 ++++-
 .../amqp/interop/AmqpScheduledMessageTest.java  | 91 +++++++++++++++++++-
 .../src/test/resources/log4j.properties         |  1 +
 3 files changed, 112 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/3faf87ba/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
index a824cfb..c3dc1d3 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
@@ -137,13 +137,32 @@ public abstract class InboundTransformer {
                 if ("x-opt-jms-type".equals(key) && entry.getValue() != null) {
                     // Legacy annotation, JMSType value will be replaced by Subject further
down if also present.
                     jms.setJMSType(entry.getValue().toString());
-                }
-                if ("x-opt-delivery-time".equals(key) && entry.getValue() != null)
{
+                } else if ("x-opt-delivery-time".equals(key) && entry.getValue()
!= null) {
                     long deliveryTime = ((Number) entry.getValue()).longValue();
                     long delay = deliveryTime - System.currentTimeMillis();
                     if (delay > 0) {
                         jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
                     }
+                } else if ("x-opt-delivery-delay".equals(key) && entry.getValue()
!= null) {
+                    long delay = ((Number) entry.getValue()).longValue();
+                    if (delay > 0) {
+                        jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
+                    }
+                } else if ("x-opt-delivery-repeat".equals(key) && entry.getValue()
!= null) {
+                    int repeat = ((Number) entry.getValue()).intValue();
+                    if (repeat > 0) {
+                        jms.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
+                    }
+                } else if ("x-opt-delivery-period".equals(key) && entry.getValue()
!= null) {
+                    long period = ((Number) entry.getValue()).longValue();
+                    if (period > 0) {
+                        jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
+                    }
+                } else if ("x-opt-delivery-cron".equals(key) && entry.getValue()
!= null) {
+                    String cronEntry = (String) entry.getValue();
+                    if (cronEntry != null) {
+                        jms.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cronEntry);
+                    }
                 }
 
                 setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());

http://git-wip-us.apache.org/repos/asf/activemq/blob/3faf87ba/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
index cbe3598..14f4752 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.interop;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.concurrent.TimeUnit;
 
@@ -148,13 +149,100 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport
{
         sender.send(message);
         sender.close();
 
+        // Read the message with short timeout, shouldn't get it.
+        try {
+            readMessages(getTestName(), 1, false, 500);
+            fail("Should not read the message");
+        } catch (Throwable ex) {
+        }
+
         // Read the message
         readMessages(getTestName(), 1, false);
 
         connection.close();
     }
 
+    @Test
+    public void testScheduleWithDelay() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getQueues().length);
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        // Get the Queue View early to avoid racing the delivery.
+        assertEquals(1, brokerService.getAdminView().getQueues().length);
+        final QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertNotNull(queueView);
+
+        AmqpMessage message = new AmqpMessage();
+        long delay = 5000;
+        message.setMessageAnnotation("x-opt-delivery-delay", delay);
+        message.setText("Test-Message");
+        sender.send(message);
+        sender.close();
+
+        // Read the message with short timeout, shouldn't get it.
+        try {
+            readMessages(getTestName(), 1, false, 1000);
+            fail("Should not read the message");
+        } catch (Throwable ex) {
+        }
+
+        // Read the message with long timeout, should get it.
+        try {
+            readMessages(getTestName(), 1, false, 10000);
+        } catch (Throwable ex) {
+            fail("Should read the message");
+        }
+
+        connection.close();
+    }
+
+    @Test
+    public void testScheduleRepeated() throws Exception {
+        final int NUMBER = 10;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getQueues().length);
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        // Get the Queue View early to avoid racing the delivery.
+        assertEquals(1, brokerService.getAdminView().getQueues().length);
+        final QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertNotNull(queueView);
+
+        AmqpMessage message = new AmqpMessage();
+        long delay = 1000;
+        message.setMessageAnnotation("x-opt-delivery-delay", delay);
+        message.setMessageAnnotation("x-opt-delivery-period", 500);
+        message.setMessageAnnotation("x-opt-delivery-repeat", NUMBER - 1);
+        message.setText("Test-Message");
+        sender.send(message);
+        sender.close();
+
+        readMessages(getTestName(), NUMBER, false);
+        // Read the message with short timeout, shouldn't get it.
+        try {
+            readMessages(getTestName(), 1, false, 600);
+            fail("Should not read more messages");
+        } catch (Throwable ex) {
+        }
+
+        connection.close();
+    }
+
     public void readMessages(String destinationName, int count, boolean topic) throws Exception
{
+        readMessages(destinationName, count, topic, 5000);
+    }
+
+    public void readMessages(String destinationName, int count, boolean topic, long timeout)
throws Exception {
         Connection connection = createJMSConnection();
         connection.start();
 
@@ -169,8 +257,9 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
 
             MessageConsumer consumer = session.createConsumer(destination);
             for (int i = 1; i <= count; i++) {
-                Message received = consumer.receive(5000);
+                Message received = consumer.receive(timeout);
                 assertNotNull(received);
+                LOG.info("Read next message: {}", received.getJMSMessageID());
             }
         } finally {
             connection.close();

http://git-wip-us.apache.org/repos/asf/activemq/blob/3faf87ba/activemq-amqp/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/resources/log4j.properties b/activemq-amqp/src/test/resources/log4j.properties
index 63c4701..4111e16 100755
--- a/activemq-amqp/src/test/resources/log4j.properties
+++ b/activemq-amqp/src/test/resources/log4j.properties
@@ -20,6 +20,7 @@
 #
 log4j.rootLogger=WARN, console, file
 log4j.logger.org.apache.activemq=INFO
+log4j.logger.org.apache.activemq.broker.scheduler=TRACE
 log4j.logger.org.apache.activemq.transport.amqp=DEBUG
 log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO
 log4j.logger.org.fusesource=INFO


Mime
View raw message