activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1071109 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
Date Tue, 15 Feb 2011 23:46:43 GMT
Author: tabish
Date: Tue Feb 15 23:46:43 2011
New Revision: 1071109

URL: http://svn.apache.org/viewvc?rev=1071109&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3000

Scheduler wasn't taking into account that there might be a job that was scheduled earlier
than the last wait time for jobs when they are fired.

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java?rev=1071109&r1=1071108&r2=1071109&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
Tue Feb 15 23:46:43 2011
@@ -455,7 +455,7 @@ class JobSchedulerImpl extends ServiceSu
                 long currentTime = System.currentTimeMillis();
 
                 // Reads the list of the next entries and removes them from the store in
one atomic step.
-                // Prevents race conditions on short delays, when storeJob() tries to append
new items to the 
+                // Prevents race conditions on short delays, when storeJob() tries to append
new items to the
                 // existing list during this read operation (see AMQ-3141).
                 synchronized (this) {
                     Map.Entry<Long, List<JobLocation>> first = getNextToSchedule();
@@ -464,7 +464,6 @@ class JobSchedulerImpl extends ServiceSu
                         final long executionTime = first.getKey();
                         long nextExecutionTime = 0;
                         if (executionTime <= currentTime) {
-    
                             for (final JobLocation job : list) {
                                 int repeat = job.getRepeat();
                                 nextExecutionTime = calculateNextExecutionTime(job, currentTime,
repeat);
@@ -488,6 +487,7 @@ class JobSchedulerImpl extends ServiceSu
                                         // execute at
                                         // this time - just a cron job - so fire it
                                         fireJob(job);
+                                        //this.scheduleTime.setWaitTime(this.scheduleTime.DEFAULT_WAIT);
                                     }
                                     if (nextExecutionTime > currentTime) {
                                         // we will run again ...
@@ -517,6 +517,17 @@ class JobSchedulerImpl extends ServiceSu
                             // now remove all jobs that have not been
                             // rescheduled from this execution time
                             remove(executionTime);
+
+                            // If there is a job that should fire before the currently set
wait time
+                            // we need to reset wait time otherwise we'll miss it.
+                            Map.Entry<Long, List<JobLocation>> nextUp = getNextToSchedule();
+                            if (nextUp != null) {
+	                            final long timeUntilNextScheduled = nextUp.getKey() - currentTime;
+	                            if (timeUntilNextScheduled < this.scheduleTime.getWaitTime())
{
+	                            	this.scheduleTime.setWaitTime(timeUntilNextScheduled);
+	                            }
+                            }
+
                         } else {
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug("Not yet time to execute the job, waiting " + (executionTime
- currentTime) + " ms");

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java?rev=1071109&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
Tue Feb 15 23:46:43 2011
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsCronSchedulerTest extends EmbeddedBrokerTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsCronSchedulerTest.class);
+
+    public void testSimulatenousCron() throws Exception {
+
+        final int COUNT = 10;
+        final AtomicInteger count = new AtomicInteger();
+        Connection connection = createConnection();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                latch.countDown();
+                count.incrementAndGet();
+            	LOG.debug("Received one Message, count is at: " + count.get());
+            }
+        });
+
+        connection.start();
+        for (int i = 0; i < COUNT; i++) {
+	        MessageProducer producer = session.createProducer(destination);
+	        TextMessage message = session.createTextMessage("test msg "+i);
+	        message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *");
+	        producer.send(message);
+	        producer.close();
+	        //wait a couple sec so cron start time is different for next message
+            Thread.sleep(2000);
+        }
+        SchedulerBroker sb = (SchedulerBroker) this.broker.getBroker().getAdaptor(SchedulerBroker.class);
+        JobScheduler js = sb.getJobScheduler();
+        List<Job> list = js.getAllJobs();
+        assertEquals(COUNT, list.size());
+        latch.await(2, TimeUnit.MINUTES);
+        //All should messages should have been received by now
+        assertEquals(COUNT, count.get());
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        bindAddress = "vm://localhost";
+        super.setUp();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        return createBroker(true);
+    }
+
+    protected BrokerService createBroker(boolean delete) throws Exception {
+        File schedulerDirectory = new File("target/scheduler");
+        if (delete) {
+            IOHelper.mkdirs(schedulerDirectory);
+            IOHelper.deleteChildren(schedulerDirectory);
+        }
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(isPersistent());
+        answer.getManagementContext().setCreateConnector(false);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.setDataDirectory("target");
+        answer.setSchedulerDirectoryFile(schedulerDirectory);
+        answer.setSchedulerSupport(true);
+        answer.setUseJmx(false);
+        answer.addConnector(bindAddress);
+        return answer;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message