Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C094318D69 for ; Fri, 13 Nov 2015 16:48:17 +0000 (UTC) Received: (qmail 56747 invoked by uid 500); 13 Nov 2015 16:48:17 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 56712 invoked by uid 500); 13 Nov 2015 16:48:17 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 56703 invoked by uid 99); 13 Nov 2015 16:48:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Nov 2015 16:48:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8C88CDFF8A; Fri, 13 Nov 2015 16:48:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Message-Id: <239ec73d983940e4942ac89439355b9e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-6037 Date: Fri, 13 Nov 2015 16:48:17 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/activemq-5.12.x e0e737743 -> 3faf87ba8 https://issues.apache.org/jira/browse/AMQ-6037 Add support for amqp style variants of the ActiveMQ job scheduler options set in message annotations. (cherry picked from commit 5d353e241b0ba76ad1a0b42ef5c7a2ae54106860) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3faf87ba Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3faf87ba Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3faf87ba Branch: refs/heads/activemq-5.12.x Commit: 3faf87ba8a93b1db72dc06c0810140ab6d748b3d Parents: e0e7377 Author: Timothy Bish Authored: Tue Nov 10 18:12:40 2015 -0500 Committer: Timothy Bish Committed: Fri Nov 13 11:48:12 2015 -0500 ---------------------------------------------------------------------- .../amqp/message/InboundTransformer.java | 23 ++++- .../amqp/interop/AmqpScheduledMessageTest.java | 91 +++++++++++++++++++- .../src/test/resources/log4j.properties | 1 + 3 files changed, 112 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/3faf87ba/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java index a824cfb..c3dc1d3 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java @@ -137,13 +137,32 @@ public abstract class InboundTransformer { if ("x-opt-jms-type".equals(key) && entry.getValue() != null) { // Legacy annotation, JMSType value will be replaced by Subject further down if also present. jms.setJMSType(entry.getValue().toString()); - } - if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { + } else if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { long deliveryTime = ((Number) entry.getValue()).longValue(); long delay = deliveryTime - System.currentTimeMillis(); if (delay > 0) { jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); } + } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) { + long delay = ((Number) entry.getValue()).longValue(); + if (delay > 0) { + jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); + } + } else if ("x-opt-delivery-repeat".equals(key) && entry.getValue() != null) { + int repeat = ((Number) entry.getValue()).intValue(); + if (repeat > 0) { + jms.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); + } + } else if ("x-opt-delivery-period".equals(key) && entry.getValue() != null) { + long period = ((Number) entry.getValue()).longValue(); + if (period > 0) { + jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); + } + } else if ("x-opt-delivery-cron".equals(key) && entry.getValue() != null) { + String cronEntry = (String) entry.getValue(); + if (cronEntry != null) { + jms.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cronEntry); + } } setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue()); http://git-wip-us.apache.org/repos/asf/activemq/blob/3faf87ba/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java index cbe3598..14f4752 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.interop; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.concurrent.TimeUnit; @@ -148,13 +149,100 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport { sender.send(message); sender.close(); + // Read the message with short timeout, shouldn't get it. + try { + readMessages(getTestName(), 1, false, 500); + fail("Should not read the message"); + } catch (Throwable ex) { + } + // Read the message readMessages(getTestName(), 1, false); connection.close(); } + @Test + public void testScheduleWithDelay() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerService.getAdminView().getQueues().length); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + + // Get the Queue View early to avoid racing the delivery. + assertEquals(1, brokerService.getAdminView().getQueues().length); + final QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + long delay = 5000; + message.setMessageAnnotation("x-opt-delivery-delay", delay); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + // Read the message with short timeout, shouldn't get it. + try { + readMessages(getTestName(), 1, false, 1000); + fail("Should not read the message"); + } catch (Throwable ex) { + } + + // Read the message with long timeout, should get it. + try { + readMessages(getTestName(), 1, false, 10000); + } catch (Throwable ex) { + fail("Should read the message"); + } + + connection.close(); + } + + @Test + public void testScheduleRepeated() throws Exception { + final int NUMBER = 10; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerService.getAdminView().getQueues().length); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + + // Get the Queue View early to avoid racing the delivery. + assertEquals(1, brokerService.getAdminView().getQueues().length); + final QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + long delay = 1000; + message.setMessageAnnotation("x-opt-delivery-delay", delay); + message.setMessageAnnotation("x-opt-delivery-period", 500); + message.setMessageAnnotation("x-opt-delivery-repeat", NUMBER - 1); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + readMessages(getTestName(), NUMBER, false); + // Read the message with short timeout, shouldn't get it. + try { + readMessages(getTestName(), 1, false, 600); + fail("Should not read more messages"); + } catch (Throwable ex) { + } + + connection.close(); + } + public void readMessages(String destinationName, int count, boolean topic) throws Exception { + readMessages(destinationName, count, topic, 5000); + } + + public void readMessages(String destinationName, int count, boolean topic, long timeout) throws Exception { Connection connection = createJMSConnection(); connection.start(); @@ -169,8 +257,9 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport { MessageConsumer consumer = session.createConsumer(destination); for (int i = 1; i <= count; i++) { - Message received = consumer.receive(5000); + Message received = consumer.receive(timeout); assertNotNull(received); + LOG.info("Read next message: {}", received.getJMSMessageID()); } } finally { connection.close(); http://git-wip-us.apache.org/repos/asf/activemq/blob/3faf87ba/activemq-amqp/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/resources/log4j.properties b/activemq-amqp/src/test/resources/log4j.properties index 63c4701..4111e16 100755 --- a/activemq-amqp/src/test/resources/log4j.properties +++ b/activemq-amqp/src/test/resources/log4j.properties @@ -20,6 +20,7 @@ # log4j.rootLogger=WARN, console, file log4j.logger.org.apache.activemq=INFO +log4j.logger.org.apache.activemq.broker.scheduler=TRACE log4j.logger.org.apache.activemq.transport.amqp=DEBUG log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO log4j.logger.org.fusesource=INFO