activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1516561 - in /activemq/trunk: activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/ activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/
Date Thu, 22 Aug 2013 19:45:22 GMT
Author: tabish
Date: Thu Aug 22 19:45:22 2013
New Revision: 1516561

URL: http://svn.apache.org/r1516561
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4683

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/LostScheduledMessagesTest.java
  (with props)
Modified:
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java?rev=1516561&r1=1516560&r2=1516561&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
(original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
Thu Aug 22 19:45:22 2013
@@ -32,17 +32,17 @@ import org.apache.activemq.broker.schedu
 import org.apache.activemq.broker.scheduler.Job;
 import org.apache.activemq.broker.scheduler.JobListener;
 import org.apache.activemq.broker.scheduler.JobScheduler;
-import org.apache.activemq.util.IdGenerator;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ServiceSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
 import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler {
     private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class);
@@ -51,12 +51,12 @@ class JobSchedulerImpl extends ServiceSu
     private String name;
     BTreeIndex<Long, List<JobLocation>> index;
     private Thread thread;
+    private final Object listenerLock = new Object();
     private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
     private static final IdGenerator ID_GENERATOR = new IdGenerator();
     private final ScheduleTime scheduleTime = new ScheduleTime();
 
     JobSchedulerImpl(JobSchedulerStoreImpl store) {
-
         this.store = store;
     }
 
@@ -66,65 +66,77 @@ class JobSchedulerImpl extends ServiceSu
 
     /*
      * (non-Javadoc)
+     *
      * @see org.apache.activemq.beanstalk.JobScheduler#getName()
      */
+    @Override
     public String getName() {
         return this.name;
     }
 
     /*
      * (non-Javadoc)
-     * @see
-     * org.apache.activemq.beanstalk.JobScheduler#addListener(org.apache.activemq
-     * .beanstalk.JobListener)
+     *
+     * @see org.apache.activemq.beanstalk.JobScheduler#addListener(org.apache.activemq .beanstalk.JobListener)
      */
+    @Override
     public void addListener(JobListener l) {
         this.jobListeners.add(l);
+        synchronized (this.listenerLock) {
+            this.listenerLock.notify();
+        }
     }
 
     /*
      * (non-Javadoc)
-     * @see
-     * org.apache.activemq.beanstalk.JobScheduler#removeListener(org.apache.
-     * activemq.beanstalk.JobListener)
+     *
+     * @see org.apache.activemq.beanstalk.JobScheduler#removeListener(org.apache. activemq.beanstalk.JobListener)
      */
+    @Override
     public void removeListener(JobListener l) {
         this.jobListeners.remove(l);
     }
 
+    @Override
     public synchronized void schedule(final String jobId, final ByteSequence payload, final
long delay) throws IOException {
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>()
{
+            @Override
             public void execute(Transaction tx) throws IOException {
                 schedule(tx, jobId, payload, "", 0, delay, 0);
             }
         });
     }
 
+    @Override
     public synchronized void schedule(final String jobId, final ByteSequence payload, final
String cronEntry) throws Exception {
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>()
{
+            @Override
             public void execute(Transaction tx) throws IOException {
                 schedule(tx, jobId, payload, cronEntry, 0, 0, 0);
             }
         });
-
     }
 
-    public synchronized void schedule(final String jobId, final ByteSequence payload, final
String cronEntry, final long delay,
-            final long period, final int repeat) throws IOException {
+    @Override
+    public synchronized void schedule(final String jobId, final ByteSequence payload, final
String cronEntry, final long delay, final long period,
+        final int repeat) throws IOException {
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>()
{
+            @Override
             public void execute(Transaction tx) throws IOException {
                 schedule(tx, jobId, payload, cronEntry, delay, period, repeat);
             }
         });
-
     }
 
     /*
      * (non-Javadoc)
+     *
      * @see org.apache.activemq.beanstalk.JobScheduler#remove(long)
      */
+    @Override
     public synchronized void remove(final long time) throws IOException {
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>()
{
+            @Override
             public void execute(Transaction tx) throws IOException {
                 remove(tx, time);
             }
@@ -133,6 +145,7 @@ class JobSchedulerImpl extends ServiceSu
 
     synchronized void removeFromIndex(final long time, final String jobId) throws IOException
{
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>()
{
+            @Override
             public void execute(Transaction tx) throws IOException {
                 removeFromIndex(tx, time, jobId);
             }
@@ -141,11 +154,12 @@ class JobSchedulerImpl extends ServiceSu
 
     /*
      * (non-Javadoc)
-     * @see org.apache.activemq.beanstalk.JobScheduler#remove(long,
-     * java.lang.String)
+     *
+     * @see org.apache.activemq.beanstalk.JobScheduler#remove(long, java.lang.String)
      */
     public synchronized void remove(final long time, final String jobId) throws IOException
{
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>()
{
+            @Override
             public void execute(Transaction tx) throws IOException {
                 remove(tx, time, jobId);
             }
@@ -154,16 +168,20 @@ class JobSchedulerImpl extends ServiceSu
 
     /*
      * (non-Javadoc)
+     *
      * @see org.apache.activemq.beanstalk.JobScheduler#remove(java.lang.String)
      */
+    @Override
     public synchronized void remove(final String jobId) throws IOException {
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>()
{
+            @Override
             public void execute(Transaction tx) throws IOException {
                 remove(tx, jobId);
             }
         });
     }
 
+    @Override
     public synchronized long getNextScheduleTime() throws IOException {
         Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
         return first != null ? first.getKey() : -1l;
@@ -171,12 +189,15 @@ class JobSchedulerImpl extends ServiceSu
 
     /*
      * (non-Javadoc)
+     *
      * @see org.apache.activemq.beanstalk.JobScheduler#getNextScheduleJobs()
      */
+    @Override
     public synchronized List<Job> getNextScheduleJobs() throws IOException {
         final List<Job> result = new ArrayList<Job>();
 
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>()
{
+            @Override
             public void execute(Transaction tx) throws IOException {
                 Map.Entry<Long, List<JobLocation>> first = index.getFirst(store.getPageFile().tx());
                 if (first != null) {
@@ -191,9 +212,11 @@ class JobSchedulerImpl extends ServiceSu
         return result;
     }
 
+    @Override
     public synchronized List<Job> getAllJobs() throws IOException {
         final List<Job> result = new ArrayList<Job>();
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>()
{
+            @Override
             public void execute(Transaction tx) throws IOException {
                 Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx());
                 while (iter.hasNext()) {
@@ -208,15 +231,16 @@ class JobSchedulerImpl extends ServiceSu
                         break;
                     }
                 }
-
             }
         });
         return result;
     }
 
+    @Override
     public synchronized List<Job> getAllJobs(final long start, final long finish) throws
IOException {
         final List<Job> result = new ArrayList<Job>();
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>()
{
+            @Override
             public void execute(Transaction tx) throws IOException {
                 Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx(),
start);
                 while (iter.hasNext()) {
@@ -231,35 +255,36 @@ class JobSchedulerImpl extends ServiceSu
                         break;
                     }
                 }
-
             }
         });
         return result;
     }
 
+    @Override
     public synchronized void removeAllJobs() throws IOException {
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>()
{
+            @Override
             public void execute(Transaction tx) throws IOException {
                 destroy(tx);
             }
         });
     }
 
+    @Override
     public synchronized void removeAllJobs(final long start, final long finish) throws IOException
{
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>()
{
+            @Override
             public void execute(Transaction tx) throws IOException {
                 destroy(tx, start, finish);
             }
         });
-
     }
 
     ByteSequence getPayload(Location location) throws IllegalStateException, IOException
{
         return this.store.getPayload(location);
     }
 
-    void schedule(Transaction tx, String jobId, ByteSequence payload, String cronEntry, long
delay, long period,
-            int repeat) throws IOException {
+    void schedule(Transaction tx, String jobId, ByteSequence payload, String cronEntry, long
delay, long period, int repeat) throws IOException {
         long startTime = System.currentTimeMillis();
         // round startTime - so we can schedule more jobs
         // at the same time
@@ -301,6 +326,7 @@ class JobSchedulerImpl extends ServiceSu
 
     synchronized void storeJob(final JobLocation jobLocation, final long nextExecutionTime)
throws IOException {
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>()
{
+            @Override
             public void execute(Transaction tx) throws IOException {
                 storeJob(tx, jobLocation, nextExecutionTime);
             }
@@ -318,7 +344,6 @@ class JobSchedulerImpl extends ServiceSu
         }
         values.add(jobLocation);
         this.index.put(tx, nextExecutionTime, values);
-
     }
 
     void remove(Transaction tx, long time, String jobId) throws IOException {
@@ -383,6 +408,7 @@ class JobSchedulerImpl extends ServiceSu
                 }
             }
         }
+
         for (Long l : keys) {
             this.index.remove(tx, l);
         }
@@ -404,6 +430,7 @@ class JobSchedulerImpl extends ServiceSu
                 break;
             }
         }
+
         for (Long l : keys) {
             this.index.remove(tx, l);
         }
@@ -415,7 +442,6 @@ class JobSchedulerImpl extends ServiceSu
             return first;
         }
         return null;
-
     }
 
     void fireJob(JobLocation job) throws IllegalStateException, IOException {
@@ -428,6 +454,7 @@ class JobSchedulerImpl extends ServiceSu
         }
     }
 
+    @Override
     public void run() {
         try {
             mainLoop();
@@ -453,6 +480,19 @@ class JobSchedulerImpl extends ServiceSu
 
     protected void mainLoop() {
         while (this.running.get()) {
+
+            // Can't start pumping messages until a listener is added otherwise we'd discard
messages
+            // without any warning.
+            synchronized (listenerLock) {
+                while (this.running.get() && this.jobListeners.isEmpty()) {
+                    try {
+                        LOG.debug("Scheduled Message dispatch paused while awaiting a Job
Listener");
+                        this.listenerLock.wait();
+                    } catch (InterruptedException e) {
+                    }
+                }
+            }
+
             this.scheduleTime.clearNewJob();
             try {
                 // peek the next job
@@ -478,8 +518,7 @@ class JobSchedulerImpl extends ServiceSu
                                     if (repeat != 0) {
                                         repeat--;
                                         job.setRepeat(repeat);
-                                        // remove this job from the index - so it
-                                        // doesn't get destroyed
+                                        // remove this job from the index so it doesn't get
destroyed
                                         removeFromIndex(executionTime, job.getJobId());
                                         // and re-store it
                                         storeJob(job, nextExecutionTime);
@@ -487,28 +526,21 @@ class JobSchedulerImpl extends ServiceSu
                                 } else {
                                     // cron job
                                     if (repeat == 0) {
-                                        // we haven't got a separate scheduler to
-                                        // execute at
+                                        // we haven't got a separate scheduler to execute
at
                                         // this time - just a cron job - so fire it
                                         fireJob(job);
-                                        //this.scheduleTime.setWaitTime(this.scheduleTime.DEFAULT_WAIT);
                                     }
                                     if (nextExecutionTime > currentTime) {
                                         // we will run again ...
-                                        // remove this job from the index - so it
-                                        // doesn't get destroyed
+                                        // remove this job from the index - so it doesn't
get destroyed
                                         removeFromIndex(executionTime, job.getJobId());
                                         // and re-store it
                                         storeJob(job, nextExecutionTime);
                                         if (repeat != 0) {
-                                            // we have a separate schedule to run at
-                                            // this time
-                                            // so the cron job is used to set of a
-                                            // seperate scheule
-                                            // hence we won't fire the original cron
-                                            // job to the listeners
-                                            // but we do need to start a separate
-                                            // schedule
+                                            // we have a separate schedule to run at this
time
+                                            // so the cron job is used to set of a separate
schedule
+                                            // hence we won't fire the original cron job
to the
+                                            // listeners but we do need to start a separate
schedule
                                             String jobId = ID_GENERATOR.generateId();
                                             ByteSequence payload = getPayload(job.getLocation());
                                             schedule(jobId, payload, "", job.getDelay(),
job.getPeriod(), job.getRepeat());
@@ -526,12 +558,11 @@ class JobSchedulerImpl extends ServiceSu
                             // we need to reset wait time otherwise we'll miss it.
                             Map.Entry<Long, List<JobLocation>> nextUp = getNextToSchedule();
                             if (nextUp != null) {
-	                            final long timeUntilNextScheduled = nextUp.getKey() - currentTime;
-	                            if (timeUntilNextScheduled < this.scheduleTime.getWaitTime())
{
-	                            	this.scheduleTime.setWaitTime(timeUntilNextScheduled);
-	                            }
+                                final long timeUntilNextScheduled = nextUp.getKey() - currentTime;
+                                if (timeUntilNextScheduled < this.scheduleTime.getWaitTime())
{
+                                    this.scheduleTime.setWaitTime(timeUntilNextScheduled);
+                                }
                             }
-
                         } else {
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug("Not yet time to execute the job, waiting " + (executionTime
- currentTime) + " ms");
@@ -541,7 +572,6 @@ class JobSchedulerImpl extends ServiceSu
                     }
                 }
                 this.scheduleTime.pause();
-
             } catch (Exception ioe) {
                 LOG.error(this.name + " Failed to schedule job", ioe);
                 try {
@@ -556,10 +586,12 @@ class JobSchedulerImpl extends ServiceSu
     @Override
     protected void doStart() throws Exception {
         this.running.set(true);
+        synchronized (this.listenerLock) {
+            this.listenerLock.notify();
+        }
         this.thread = new Thread(this, "JobScheduler:" + this.name);
         this.thread.setDaemon(true);
         this.thread.start();
-
     }
 
     @Override
@@ -570,7 +602,6 @@ class JobSchedulerImpl extends ServiceSu
         if (t != null) {
             t.join(1000);
         }
-
     }
 
     long calculateNextExecutionTime(final JobLocation job, long currentTime, int repeat)
throws MessageFormatException {
@@ -608,6 +639,8 @@ class JobSchedulerImpl extends ServiceSu
 
     static class ValueMarshaller extends VariableMarshaller<List<JobLocation>>
{
         static ValueMarshaller INSTANCE = new ValueMarshaller();
+
+        @Override
         public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
             List<JobLocation> result = new ArrayList<JobLocation>();
             int size = dataIn.readInt();
@@ -619,6 +652,7 @@ class JobSchedulerImpl extends ServiceSu
             return result;
         }
 
+        @Override
         public void writePayload(List<JobLocation> value, DataOutput dataOut) throws
IOException {
             dataOut.writeInt(value.size());
             for (JobLocation jobLocation : value) {
@@ -640,6 +674,7 @@ class JobSchedulerImpl extends ServiceSu
         long getWaitTime() {
             return this.waitTime;
         }
+
         /**
          * @param waitTime
          *            the waitTime to set
@@ -674,6 +709,5 @@ class JobSchedulerImpl extends ServiceSu
                 mutex.notifyAll();
             }
         }
-
     }
 }

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java?rev=1516561&r1=1516560&r2=1516561&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
(original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
Thu Aug 22 19:45:22 2013
@@ -65,7 +65,6 @@ public class JobSchedulerStoreImpl exten
     private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
     private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
     private boolean enableIndexWriteAsync = false;
-    // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
     MetaData metaData = new MetaData(this);
     final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
     Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/LostScheduledMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/LostScheduledMessagesTest.java?rev=1516561&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/LostScheduledMessagesTest.java
(added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/LostScheduledMessagesTest.java
Thu Aug 22 19:45:22 2013
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
+import org.apache.log4j.BasicConfigurator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LostScheduledMessagesTest {
+
+    private BrokerService broker;
+
+    private static final File schedulerDirectory = new File("target/test/ScheduledDB");
+    private static final File messageDirectory = new File("target/test/MessageDB");
+    private static final String QUEUE_NAME = "test";
+
+    @Before
+    public void setup() throws Exception {
+        IOHelper.mkdirs(schedulerDirectory);
+        IOHelper.deleteChildren(schedulerDirectory);
+
+        IOHelper.mkdirs(messageDirectory);
+        IOHelper.deleteChildren(messageDirectory);
+    }
+
+    private void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setSchedulerSupport(true);
+        broker.setPersistent(true);
+        broker.setDeleteAllMessagesOnStartup(false);
+        broker.setDataDirectory("target");
+        broker.setSchedulerDirectoryFile(schedulerDirectory);
+        broker.setDataDirectoryFile(messageDirectory);
+        broker.setUseJmx(false);
+        broker.addConnector("vm://localhost");
+        broker.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+        BasicConfigurator.resetConfiguration();
+    }
+
+    @Test
+    public void MessagePassedNotUsingScheduling() throws Exception {
+        doTest(false);
+    }
+
+    @Test
+    public void MessageLostWhenUsingScheduling() throws Exception {
+        doTest(true);
+    }
+
+    private void doTest(boolean useScheduling) throws Exception {
+
+        int DELIVERY_DELAY_MS = 5000;
+
+        startBroker();
+
+        long startTime = System.currentTimeMillis();
+
+        // Send a message scheduled for delivery in 5 seconds
+        ConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = cf.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
+        Message message = session.createTextMessage("test");
+        if (useScheduling) {
+            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, DELIVERY_DELAY_MS);
+        }
+        producer.send(message);
+
+        session.close();
+        connection.close();
+
+        broker.getServices();
+
+        // shut down broker
+        broker.stop();
+        broker.waitUntilStopped();
+
+        // Make sure that broker have stopped within delivery delay
+        long shutdownTime = System.currentTimeMillis();
+        assertTrue("Failed to shut down broker in expected time. Test results inconclusive",
shutdownTime - startTime < DELIVERY_DELAY_MS);
+
+        // make sure that delivery falls into down time window
+        TimeUnit.MILLISECONDS.sleep(DELIVERY_DELAY_MS);
+
+        // Start new broker instance
+        startBroker();
+
+        final AtomicLong receiveCounter = new AtomicLong();
+
+        cf = new ActiveMQConnectionFactory("vm://localhost");
+        connection = cf.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                receiveCounter.incrementAndGet();
+            }
+        });
+
+        // Wait for a while to let MQ process the message
+        TimeUnit.MILLISECONDS.sleep(DELIVERY_DELAY_MS * 2);
+
+        session.close();
+        connection.close();
+
+        assertEquals(1, receiveCounter.get());
+    }
+}

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/LostScheduledMessagesTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message