activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6037
Date Fri, 13 Nov 2015 16:47:30 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.12.x eccbd8715 -> e0e737743


https://issues.apache.org/jira/browse/AMQ-6037

Add support for the delivery time header to allow for scheduled messages
at a desired time.
(cherry picked from commit 480b3e7c36b157d12dfdc1318f5517c4050df312)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e0e73774
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e0e73774
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e0e73774

Branch: refs/heads/activemq-5.12.x
Commit: e0e737743cd0df401841b839fe4fb5507f6c8086
Parents: eccbd87
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Nov 9 12:00:38 2015 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Nov 13 11:46:39 2015 -0500

----------------------------------------------------------------------
 .../amqp/message/InboundTransformer.java        |   8 +
 .../transport/amqp/AmqpTestSupport.java         |   6 +-
 .../amqp/interop/AmqpScheduledMessageTest.java  | 191 +++++++++++++++++++
 3 files changed, 204 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e0e73774/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
index 26d5753..a824cfb 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
@@ -23,6 +23,7 @@ import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.Message;
 
+import org.apache.activemq.ScheduledMessage;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Decimal128;
 import org.apache.qpid.proton.amqp.Decimal32;
@@ -137,6 +138,13 @@ public abstract class InboundTransformer {
                     // Legacy annotation, JMSType value will be replaced by Subject further
down if also present.
                     jms.setJMSType(entry.getValue().toString());
                 }
+                if ("x-opt-delivery-time".equals(key) && entry.getValue() != null)
{
+                    long deliveryTime = ((Number) entry.getValue()).longValue();
+                    long delay = deliveryTime - System.currentTimeMillis();
+                    if (delay > 0) {
+                        jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
+                    }
+                }
 
                 setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e0e73774/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index 91909d4..957b12c 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -102,7 +102,7 @@ public class AmqpTestSupport {
             kaha.setDirectory(new File(KAHADB_DIRECTORY + getTestName()));
             brokerService.setPersistenceAdapter(kaha);
         }
-        brokerService.setSchedulerSupport(false);
+        brokerService.setSchedulerSupport(isSchedulerEnabled());
         brokerService.setAdvisorySupport(false);
         brokerService.setUseJmx(isUseJmx());
         brokerService.getManagementContext().setCreateConnector(false);
@@ -186,6 +186,10 @@ public class AmqpTestSupport {
         return true;
     }
 
+    protected boolean isSchedulerEnabled() {
+        return false;
+    }
+
     protected boolean isUseOpenWireConnector() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e0e73774/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
new file mode 100644
index 0000000..cbe3598
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+
+import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+
+/**
+ * Test for scheduled message support using AMQP message annotations.
+ */
+public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
+
+    @Override
+    protected boolean isSchedulerEnabled() {
+        return true;
+    }
+
+    @Override
+    protected boolean isUseOpenWireConnector() {
+        return true;
+    }
+
+    @Test(timeout = 60000)
+    public void testSendWithDeliveryTimeIsScheduled() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getQueues().length);
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        // Get the Queue View early to avoid racing the delivery.
+        assertEquals(1, brokerService.getAdminView().getQueues().length);
+        final QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertNotNull(queueView);
+
+        AmqpMessage message = new AmqpMessage();
+        long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2);
+        message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
+        message.setText("Test-Message");
+        sender.send(message);
+
+        JobSchedulerViewMBean view = getJobSchedulerMBean();
+        assertNotNull(view);
+        assertEquals(1, view.getAllJobs().size());
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testSendRecvWithDeliveryTime() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getQueues().length);
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        // Get the Queue View early to avoid racing the delivery.
+        assertEquals(1, brokerService.getAdminView().getQueues().length);
+        final QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertNotNull(queueView);
+
+        AmqpMessage message = new AmqpMessage();
+        long deliveryTime = System.currentTimeMillis() + 2000;
+        message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
+        message.setText("Test-Message");
+        sender.send(message);
+
+        assertTrue("Delayed message should be delivered", Wait.waitFor(new Wait.Condition()
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queueView.getQueueSize() == 1;
+            }
+        }));
+
+        // Now try and get the message
+        receiver.flow(1);
+        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(received);
+        received.accept();
+        Long msgDeliveryTime = (Long) received.getMessageAnnotation("x-opt-delivery-time");
+        assertNotNull(msgDeliveryTime);
+        assertEquals(deliveryTime, msgDeliveryTime.longValue());
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testSendScheduledReceiveOverOpenWire() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getQueues().length);
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        // Get the Queue View early to avoid racing the delivery.
+        assertEquals(1, brokerService.getAdminView().getQueues().length);
+        final QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertNotNull(queueView);
+
+        AmqpMessage message = new AmqpMessage();
+        long deliveryTime = System.currentTimeMillis() + 2000;
+        message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
+        message.setText("Test-Message");
+        sender.send(message);
+        sender.close();
+
+        // Read the message
+        readMessages(getTestName(), 1, false);
+
+        connection.close();
+    }
+
+    public void readMessages(String destinationName, int count, boolean topic) throws Exception
{
+        Connection connection = createJMSConnection();
+        connection.start();
+
+        try {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Destination destination = null;
+            if (topic) {
+                destination = session.createTopic(destinationName);
+            } else {
+                destination = session.createQueue(destinationName);
+            }
+
+            MessageConsumer consumer = session.createConsumer(destination);
+            for (int i = 1; i <= count; i++) {
+                Message received = consumer.receive(5000);
+                assertNotNull(received);
+            }
+        } finally {
+            connection.close();
+        }
+    }
+
+    protected JobSchedulerViewMBean getJobSchedulerMBean() throws Exception {
+        ObjectName objectName = brokerService.getAdminView().getJMSJobScheduler();
+        JobSchedulerViewMBean scheduler = null;
+        if (objectName != null) {
+            scheduler = (JobSchedulerViewMBean) brokerService.getManagementContext()
+                .newProxyInstance(objectName, JobSchedulerViewMBean.class, true);
+        }
+
+        return scheduler;
+    }
+
+}


Mime
View raw message