activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1004411 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/scheduler/ test/java/org/apache/activemq/broker/scheduler/
Date Mon, 04 Oct 2010 20:35:19 GMT
Author: tabish
Date: Mon Oct  4 20:35:18 2010
New Revision: 1004411

URL: http://svn.apache.org/viewvc?rev=1004411&view=rev
Log:
fix for: https://issues.apache.org/activemq/browse/AMQ-2941

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java?rev=1004411&r1=1004410&r2=1004411&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java Mon
Oct  4 20:35:18 2010
@@ -18,7 +18,7 @@ package org.apache.activemq;
 
 public interface ScheduledMessage {
     /**
-     * The time in milliseconds that a message will wait before being scheduled to be 
+     * The time in milliseconds that a message will wait before being scheduled to be
      * delivered by the broker
      */
     public static final String AMQ_SCHEDULED_DELAY = "AMQ_SCHEDULED_DELAY";
@@ -34,6 +34,48 @@ public interface ScheduledMessage {
      * Use a Cron tab entry to set the schedule
      */
     public static final String AMQ_SCHEDULED_CRON = "AMQ_SCHEDULED_CRON";
-    
+    /**
+     * An Id that is assigned to a Scheduled Message, this value is only available once the
+     * Message is scheduled, Messages sent to the Browse Destination or delivered to the
+     * assigned Destination will have this value set.
+     */
+    public static final String AMQ_SCHEDULED_ID = "scheduledJobId";
+
+    /**
+     * Special destination to send Message's to with an assigned "action" that the Scheduler
+     * should perform such as removing a message.
+     */
+    public static final String AMQ_SCHEDULER_MANAGEMENT_DESTINATION = "ActiveMQ.Scheduler.Management";
+    /**
+     * Used to specify that a some operation should be performed on the Scheduled Message,
+     * the Message must have an assigned Id for this action to be taken.
+     */
+    public static final String AMQ_SCHEDULER_ACTION = "AMQ_SCHEDULER_ACTION";
+
+    /**
+     * Indicates that a browse of the Scheduled Messages is being requested.
+     */
+    public static final String AMQ_SCHEDULER_ACTION_BROWSE = "BROWSE";
+    /**
+     * Indicates that a Scheduled Message is to be remove from the Scheduler, the Id of
+     * the scheduled message must be set as a property in order for this action to have
+     * any effect.
+     */
+    public static final String AMQ_SCHEDULER_ACTION_REMOVE = "REMOVE";
+    /**
+     * Indicates that all scheduled Messages should be removed.
+     */
+    public static final String AMQ_SCHEDULER_ACTION_REMOVEALL = "REMOVEALL";
+
+    /**
+     * A property that holds the beginning of the time interval that the specified action
should
+     * be applied within.  Maps to a long value that specified time in milliseconds since
UTC.
+     */
+    public static final String AMQ_SCHEDULER_ACTION_START_TIME = "ACTION_START_TIME";
+    /**
+     * A property that holds the end of the time interval that the specified action should
be
+     * applied within.  Maps to a long value that specified time in milliseconds since UTC.
+     */
+    public static final String AMQ_SCHEDULER_ACTION_END_TIME = "ACTION_END_TIME";
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=1004411&r1=1004410&r2=1004411&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
Mon Oct  4 20:35:18 2010
@@ -18,11 +18,14 @@ package org.apache.activemq.broker.sched
 
 import java.io.File;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
@@ -107,31 +110,80 @@ public class SchedulerBroker extends Bro
         long period = 0;
         int repeat = 0;
         String cronEntry = "";
+        String jobId = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
         Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
         Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
         Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
 
-        if (cronValue != null || periodValue != null || delayValue != null) {
+        String physicalName = messageSend.getDestination().getPhysicalName();
+        boolean schedularManage = physicalName.regionMatches(true, 0,
+        		ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
+        		ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
+
+        if (schedularManage == true) {
+
+        	JobScheduler scheduler = getInternalScheduler();
+	        ActiveMQDestination replyTo = messageSend.getReplyTo();
+
+	        String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
+
+	        if (action != null ) {
+
+	        	Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
+	        	Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
+
+		        if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE))
{
+
+		        	if( startTime != null && endTime != null ) {
+
+		                long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
+		                long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
+
+			        	for (Job job : scheduler.getAllJobs(start, finish)) {
+			        		sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
+			        	}
+		        	} else {
+			        	for (Job job : scheduler.getAllJobs()) {
+			        		sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
+			        	}
+		        	}
+		        }
+		        if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE))
{
+		        	scheduler.remove(jobId);
+		        } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
+
+		        	if( startTime != null && endTime != null ) {
+
+		                long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
+		                long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
+
+		                scheduler.removeAllJobs(start, finish);
+		        	} else {
+			        	scheduler.removeAllJobs();
+		        	}
+		        }
+	        }
+
+        } else if ((cronValue != null || periodValue != null || delayValue != null) &&
jobId == null) {
             //clear transaction context
             Message msg = messageSend.copy();
             msg.setTransactionId(null);
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg);
-                if (cronValue != null) {
-                    cronEntry = cronValue.toString();
-                }
-                if (periodValue != null) {      
-                  period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
-                }
-                if (delayValue != null) {
-                    delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
-                }
-                Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
-                if (repeatValue != null) {
-                    repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
-                }
-                getInternalScheduler().schedule(msg.getMessageId().toString(),
-                        new ByteSequence(packet.data, packet.offset, packet.length),cronEntry,
delay, period, repeat);
-            
+            if (cronValue != null) {
+                cronEntry = cronValue.toString();
+            }
+            if (periodValue != null) {
+              period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
+            }
+            if (delayValue != null) {
+                delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
+            }
+            Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
+            if (repeatValue != null) {
+                repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
+            }
+            getInternalScheduler().schedule(msg.getMessageId().toString(),
+                    new ByteSequence(packet.data, packet.offset, packet.length),cronEntry,
delay, period, repeat);
 
         } else {
             super.send(producerExchange, messageSend);
@@ -151,14 +203,14 @@ public class SchedulerBroker extends Bro
             if (repeatValue != null) {
                 repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
             }
-  
-                if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
-                    // create a unique id - the original message could be sent
-                    // lots of times
-                    messageSend
-                            .setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
-                }
-            
+
+            if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
+                // create a unique id - the original message could be sent
+                // lots of times
+                messageSend.setMessageId(
+                		new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
+            }
+
             // Add the jobId as a property
             messageSend.setProperty("scheduledJobId", id);
 
@@ -176,7 +228,6 @@ public class SchedulerBroker extends Bro
         } catch (Exception e) {
             LOG.error("Failed to send scheduled message " + id, e);
         }
-
     }
 
     protected synchronized JobScheduler getInternalScheduler() throws Exception {
@@ -202,4 +253,37 @@ public class SchedulerBroker extends Bro
         return null;
     }
 
+	protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination
replyTo)
+			throws Exception {
+
+        org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload());
+        try {
+            Message msg = (Message) this.wireFormat.unmarshal(packet);
+            msg.setOriginalTransactionId(null);
+    		msg.setPersistent(false);
+    		msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
+    		msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
+    		msg.setDestination(replyTo);
+    		msg.setResponseRequired(false);
+    		msg.setProducerId(this.producerId);
+
+            // Add the jobId as a property
+    		msg.setProperty("scheduledJobId", job.getJobId());
+
+    		final boolean originalFlowControl = context.isProducerFlowControl();
+    		final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+    		producerExchange.setConnectionContext(context);
+    		producerExchange.setMutable(true);
+    		producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
+    		try {
+    			context.setProducerFlowControl(false);
+    			this.next.send(producerExchange, msg);
+    		} finally {
+    			context.setProducerFlowControl(originalFlowControl);
+    		}
+        } catch (Exception e) {
+            LOG.error("Failed to send scheduled message " + job.getJobId(), e);
+        }
+
+	}
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java?rev=1004411&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
Mon Oct  4 20:35:18 2010
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
+
+    private static final transient Log LOG = LogFactory.getLog(JobSchedulerManagementTest.class);
+
+    public void testRemoveAllScheduled() throws Exception {
+        final int COUNT = 5;
+        Connection connection = createConnection();
+
+        // Setup the scheduled Message
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(6), COUNT);
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // Create the Browse Destination and the Reply To location
+        Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+
+        // Create the eventual Consumer to receive the scheduled message
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        connection.start();
+
+        // Send the remove request
+        MessageProducer producer = session.createProducer(management);
+        Message request = session.createMessage();
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
+        producer.send(request);
+
+        // Now wait and see if any get delivered, none should.
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(latch.getCount(), COUNT);
+    }
+
+    public void testRemoveAllScheduledAtTime() throws Exception {
+        final int COUNT = 3;
+        Connection connection = createConnection();
+
+        // Setup the scheduled Message
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(6));
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(15));
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(20));
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // Create the Browse Destination and the Reply To location
+        Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+        Destination browseDest = session.createTemporaryQueue();
+
+        // Create the eventual Consumer to receive the scheduled message
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        // Create the "Browser"
+        MessageConsumer browser = session.createConsumer(browseDest);
+        final CountDownLatch browsedLatch = new CountDownLatch(COUNT);
+        browser.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+            	browsedLatch.countDown();
+            	LOG.debug("Scheduled Message Browser got Message: " + message);
+            }
+        });
+
+        connection.start();
+
+        long start = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10);
+        long end = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30);
+
+        // Send the remove request
+        MessageProducer producer = session.createProducer(management);
+        Message request = session.createMessage();
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
+        					      ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));
+        producer.send(request);
+
+        // Send the browse request
+        request = session.createMessage();
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
+        request.setJMSReplyTo(browseDest);
+        producer.send(request);
+
+        // now see if we got back only the one remaining message.
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(2, browsedLatch.getCount());
+
+        // Now wait and see if any get delivered, none should.
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(2, latch.getCount());
+    }
+
+    public void testBrowseAllScheduled() throws Exception {
+        final int COUNT = 10;
+        Connection connection = createConnection();
+
+        // Setup the scheduled Message
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9), COUNT);
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // Create the Browse Destination and the Reply To location
+        Destination requestBrowse = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+        Destination browseDest = session.createTemporaryQueue();
+
+        // Create the eventual Consumer to receive the scheduled message
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        // Create the "Browser"
+        MessageConsumer browser = session.createConsumer(browseDest);
+        final CountDownLatch browsedLatch = new CountDownLatch(COUNT);
+        browser.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+            	browsedLatch.countDown();
+            	LOG.debug("Scheduled Message Browser got Message: " + message);
+            }
+        });
+
+        connection.start();
+
+        // Send the browse request
+        MessageProducer producer = session.createProducer(requestBrowse);
+        Message request = session.createMessage();
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
+        request.setJMSReplyTo(browseDest);
+        producer.send(request);
+
+        // make sure the message isn't delivered early because we browsed it
+        Thread.sleep(2000);
+        assertEquals(latch.getCount(), COUNT);
+
+        // now see if we got all the scheduled messages on the browse destination.
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(browsedLatch.getCount(), 0);
+
+        // now check that they all got delivered
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(latch.getCount(), 0);
+    }
+
+    public void testBrowseWindowlScheduled() throws Exception {
+        final int COUNT = 10;
+        Connection connection = createConnection();
+
+        // Setup the scheduled Message
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(5));
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(10), COUNT);
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(20));
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // Create the Browse Destination and the Reply To location
+        Destination requestBrowse = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+        Destination browseDest = session.createTemporaryQueue();
+
+        // Create the eventual Consumer to receive the scheduled message
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT + 2);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        // Create the "Browser"
+        MessageConsumer browser = session.createConsumer(browseDest);
+        final CountDownLatch browsedLatch = new CountDownLatch(COUNT);
+        browser.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+            	browsedLatch.countDown();
+            	LOG.debug("Scheduled Message Browser got Message: " + message);
+            }
+        });
+
+        connection.start();
+
+        long start = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(6);
+        long end = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(15);
+
+        // Send the browse request
+        MessageProducer producer = session.createProducer(requestBrowse);
+        Message request = session.createMessage();
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));
+        request.setJMSReplyTo(browseDest);
+        producer.send(request);
+
+        // make sure the message isn't delivered early because we browsed it
+        Thread.sleep(2000);
+        assertEquals(COUNT + 2, latch.getCount());
+
+        // now see if we got all the scheduled messages on the browse destination.
+        latch.await(15, TimeUnit.SECONDS);
+        assertEquals(0, browsedLatch.getCount());
+
+        // now see if we got all the scheduled messages on the browse destination.
+        latch.await(20, TimeUnit.SECONDS);
+        assertEquals(0, latch.getCount());
+    }
+
+    public void testRemoveScheduled() throws Exception {
+        final int COUNT = 10;
+        Connection connection = createConnection();
+
+        // Setup the scheduled Message
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9), COUNT);
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // Create the Browse Destination and the Reply To location
+        Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+        Destination browseDest = session.createTemporaryQueue();
+
+        // Create the eventual Consumer to receive the scheduled message
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(management);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        // Create the "Browser"
+        Session browseSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer browser = browseSession.createConsumer(browseDest);
+
+        connection.start();
+
+        // Send the browse request
+        Message request = session.createMessage();
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
+        						  ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
+        request.setJMSReplyTo(browseDest);
+        producer.send(request);
+
+        // Browse all the Scheduled Messages.
+        for (int i = 0; i < COUNT; ++i) {
+        	Message message = browser.receive(2000);
+        	assertNotNull(message);
+
+        	try{
+        		Message remove = session.createMessage();
+        		remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
+        				ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
+        		remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID,
+        				message.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID));
+        		producer.send(remove);
+        	} catch(Exception e) {
+        	}
+        }
+
+        // now check that they all got removed and are not delivered.
+        latch.await(11, TimeUnit.SECONDS);
+        assertEquals(COUNT, latch.getCount());
+    }
+
+    public void testRemoveNotScheduled() throws Exception {
+        Connection connection = createConnection();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // Create the Browse Destination and the Reply To location
+        Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+
+        MessageProducer producer = session.createProducer(management);
+
+    	try{
+
+	        // Send the remove request
+			Message remove = session.createMessage();
+			remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
+					ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
+			remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, new IdGenerator().generateId());
+			producer.send(remove);
+    	} catch(Exception e) {
+    		fail("Caught unexpected exception during remove of unscheduled message.");
+    	}
+    }
+
+    public void testBrowseWithSelector() throws Exception {
+        Connection connection = createConnection();
+
+        // Setup the scheduled Message
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9));
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(10));
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(5));
+        scheduleMessage(connection, TimeUnit.SECONDS.toMillis(45));
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // Create the Browse Destination and the Reply To location
+        Destination requestBrowse = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
+        Destination browseDest = session.createTemporaryTopic();
+
+        // Create the "Browser"
+        MessageConsumer browser = session.createConsumer(browseDest, ScheduledMessage.AMQ_SCHEDULED_DELAY
+ " = 45000" );
+
+        connection.start();
+
+        // Send the browse request
+        MessageProducer producer = session.createProducer(requestBrowse);
+        Message request = session.createMessage();
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
+        request.setJMSReplyTo(browseDest);
+        producer.send(request);
+
+        // Now try and receive the one we selected
+        Message message = browser.receive(5000);
+        assertNotNull(message);
+        assertEquals(45000, message.getLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY));
+
+        // Now check if there are anymore, there shouldn't be
+        message = browser.receive(5000);
+        assertNull(message);
+    }
+
+
+    protected void scheduleMessage(Connection connection, long delay) throws Exception {
+    	scheduleMessage(connection, delay, 1);
+    }
+
+    protected void scheduleMessage(Connection connection, long delay, int count) throws Exception
{
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        TextMessage message = session.createTextMessage("test msg");
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
+
+        for(int i = 0; i < count; ++i ) {
+        	producer.send(message);
+        }
+
+        producer.close();
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        bindAddress = "vm://localhost";
+        super.setUp();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        return createBroker(true);
+    }
+
+    protected BrokerService createBroker(boolean delete) throws Exception {
+        File schedulerDirectory = new File("target/scheduler");
+        if (delete) {
+            IOHelper.mkdirs(schedulerDirectory);
+            IOHelper.deleteChildren(schedulerDirectory);
+        }
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(isPersistent());
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.setDataDirectory("target");
+        answer.setSchedulerDirectoryFile(schedulerDirectory);
+        answer.setUseJmx(false);
+        answer.addConnector(bindAddress);
+        return answer;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message