activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r908867 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/scheduler/ main/java/org/apache/activemq/command/ test/java/org/apache/activemq/br...
Date Thu, 11 Feb 2010 08:18:42 GMT
Author: rajdavies
Date: Thu Feb 11 08:18:36 2010
New Revision: 908867

URL: http://svn.apache.org/viewvc?rev=908867&view=rev
Log:
Added cron support for scheduled delivery - this is further enhancements for 
https://issues.apache.org/activemq/browse/AMQ-451

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/CronParser.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/CronParserTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java?rev=908867&r1=908866&r2=908867&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java Thu Feb 11 08:18:36 2010
@@ -18,9 +18,10 @@
 
 public interface ScheduledMessage {
     /**
-     * The time in milliseconds that a message will be scheduled to be delivered by the broker
+     * The time in milliseconds that a message will wait before being scheduled to be 
+     * delivered by the broker
      */
-    public static final String AMQ_SCHEDULED_START = "AMQ_SCHEDULED_START_TIME";
+    public static final String AMQ_SCHEDULED_DELAY = "AMQ_SCHEDULED_DELAY";
     /**
      * The time in milliseconds to wait after the start time to wait before scheduling the message again
      */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java?rev=908867&r1=908866&r2=908867&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java Thu Feb 11 08:18:36 2010
@@ -408,6 +408,7 @@
             addItem("jobId", "jobId", SimpleType.STRING);
             addItem("cronEntry", "Cron entry", SimpleType.STRING);
             addItem("start", "start time", SimpleType.STRING);
+            addItem("delay", "initial delay", SimpleType.LONG);
             addItem("next", "next time", SimpleType.STRING);
             addItem("period", "period between jobs", SimpleType.LONG);
             addItem("repeat", "number of times to repeat", SimpleType.INTEGER);
@@ -420,6 +421,7 @@
             rc.put("jobId", job.getJobId());
             rc.put("cronEntry", "" + job.getCronEntry());
             rc.put("start", job.getStartTime());
+            rc.put("delay", job.getDelay());
             rc.put("next", job.getNextExecutionTime());
             rc.put("period", job.getPeriod());
             rc.put("repeat", job.getRepeat());

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/CronParser.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/CronParser.java?rev=908867&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/CronParser.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/CronParser.java Thu Feb 11 08:18:36 2010
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.List;
+import java.util.StringTokenizer;
+import javax.jms.MessageFormatException;
+
+public class CronParser {
+    private static final int NUMBER_TOKENS = 5;
+    private static final int MINUTES = 0;
+    private static final int HOURS = 1;
+    private static final int DAY_OF_MONTH = 2;
+    private static final int MONTH = 3;
+    private static final int DAY_OF_WEEK = 4;
+
+    public static long getNextScheduledTime(final String cronEntry, long currentTime) throws MessageFormatException {
+        long result = 0;
+        if (cronEntry != null && cronEntry.length() > 0) {
+            List<String> list = tokenize(cronEntry);
+            List<CronEntry> entries = buildCronEntries(list);
+            Calendar calendar = Calendar.getInstance();
+            calendar.setTimeInMillis(currentTime);
+            int currentMinutes = calendar.get(Calendar.MINUTE);
+            int currentHours = calendar.get(Calendar.HOUR_OF_DAY);
+            int currentDayOfMonth = calendar.get(Calendar.DAY_OF_MONTH);
+            int currentMonth = calendar.get(Calendar.MONTH) + 1;
+            int currentDayOfWeek = calendar.get(Calendar.DAY_OF_WEEK) - 1;
+
+            CronEntry minutes = entries.get(MINUTES);
+            CronEntry hours = entries.get(HOURS);
+            CronEntry dayOfMonth = entries.get(DAY_OF_MONTH);
+            CronEntry month = entries.get(MONTH);
+            CronEntry dayOfWeek = entries.get(DAY_OF_MONTH);
+            if (!isCurrent(month, currentMonth)) {
+                int nextMonth = getNext(month, currentMonth);
+                Calendar working = (Calendar) calendar.clone();
+                working.add(Calendar.MONTH, nextMonth);
+                result += working.getTimeInMillis();
+            }
+            if (!isCurrent(dayOfMonth, currentDayOfMonth)) {
+                int nextDay = getNext(dayOfMonth, currentMonth);
+                Calendar working = (Calendar) calendar.clone();
+                working.add(Calendar.DAY_OF_MONTH, nextDay);
+                result += working.getTimeInMillis();
+            }
+            if (!isCurrent(dayOfWeek, currentDayOfWeek)) {
+                int nextDay = getNext(dayOfWeek, currentDayOfWeek);
+                Calendar working = (Calendar) calendar.clone();
+                working.add(Calendar.DAY_OF_WEEK, nextDay);
+                result += working.getTimeInMillis();
+            }
+            if (!isCurrent(hours, currentHours)) {
+                int nextHour = getNext(hours, currentHours);
+                Calendar working = (Calendar) calendar.clone();
+                working.add(Calendar.HOUR_OF_DAY, nextHour);
+                result += working.getTimeInMillis();
+            }
+            if (!isCurrent(minutes, currentMinutes)) {
+                int nextMinutes = getNext(minutes, currentMinutes);
+                Calendar working = (Calendar) calendar.clone();
+                working.add(Calendar.MINUTE, nextMinutes);
+                result += working.getTimeInMillis();
+            }
+            if (result == 0) {
+                // this can occur for "* * * * *"
+                result = currentTime + 60 * 1000;
+                result = result / 1000 * 1000;
+            }
+        }
+        return result;
+    }
+
+    static List<String> tokenize(String cron) throws IllegalArgumentException {
+        StringTokenizer tokenize = new StringTokenizer(cron);
+        List<String> result = new ArrayList<String>();
+        while (tokenize.hasMoreTokens()) {
+            result.add(tokenize.nextToken());
+        }
+        if (result.size() != NUMBER_TOKENS) {
+            throw new IllegalArgumentException("Not a valid cron entry - wrong number of tokens(" + result.size()
+                    + "): " + cron);
+        }
+        return result;
+    }
+
+    public static void validate(final String cronEntry) throws MessageFormatException {
+        List<String> list = tokenize(cronEntry);
+        List<CronEntry> entries = buildCronEntries(list);
+        for (CronEntry e : entries) {
+            validate(e);
+        }
+    }
+
+    static void validate(CronEntry entry) throws MessageFormatException {
+
+        List<Integer> list = calculateValues(entry);
+        if (list.isEmpty() || list.get(0).intValue() < entry.start || list.get(list.size() - 1).intValue() > entry.end) {
+            throw new MessageFormatException("Invalid token: " + entry);
+        }
+    }
+
+    static int getNext(final CronEntry entry, final int current) throws MessageFormatException {
+        int result = 0;
+        List<Integer> list = calculateValues(entry);
+        Collections.sort(list);
+        int next = -1;
+        for (Integer i : list) {
+            if (i.intValue() > current) {
+                next = i.intValue();
+                break;
+            }
+        }
+        if (next != -1) {
+            result = next - current;
+        } else {
+            int first = list.get(0).intValue();
+            result = entry.end + first - entry.start - current;
+        }
+
+        return result;
+    }
+
+    static boolean isCurrent(final CronEntry entry, final int current) throws MessageFormatException {
+
+        List<Integer> list = calculateValues(entry);
+        boolean result = list.contains(new Integer(current));
+        return result;
+    }
+
+    protected static List<Integer> calculateValues(CronEntry entry) {
+        List<Integer> result = new ArrayList<Integer>();
+        if (isAll(entry.token)) {
+            for (int i = entry.start; i < entry.end; i++) {
+                result.add(i);
+            }
+        } else if (isAStep(entry.token)) {
+            int denominator = getDenominator(entry.token);
+            String numerator = getNumerator(entry.token);
+            CronEntry ce = new CronEntry(entry.name, numerator, entry.start, entry.end);
+            List<Integer> list = calculateValues(ce);
+            for (Integer i : list) {
+                if (i.intValue() % denominator == 0) {
+                    result.add(i);
+                }
+            }
+        } else if (isAList(entry.token)) {
+            StringTokenizer tokenizer = new StringTokenizer(entry.token, ",");
+            while (tokenizer.hasMoreTokens()) {
+                String str = tokenizer.nextToken();
+                CronEntry ce = new CronEntry(entry.name, str, entry.start, entry.end);
+                List<Integer> list = calculateValues(ce);
+                result.addAll(list);
+            }
+        } else if (isARange(entry.token)) {
+            int index = entry.token.indexOf('-');
+            int first = Integer.parseInt(entry.token.substring(0, index));
+            int last = Integer.parseInt(entry.token.substring(index + 1));
+            for (int i = first; i <= last; i++) {
+                result.add(i);
+            }
+        } else {
+            int value = Integer.parseInt(entry.token);
+            result.add(value);
+        }
+        return result;
+    }
+
+    protected static boolean isARange(String token) {
+        return token != null && token.indexOf('-') >= 0;
+    }
+
+    protected static boolean isAStep(String token) {
+        return token != null && token.indexOf('/') >= 0;
+    }
+
+    protected static boolean isAList(String token) {
+        return token != null && token.indexOf(',') >= 0;
+    }
+
+    protected static boolean isAll(String token) {
+        return token != null && token.length() == 1 && token.charAt(0) == '*';
+    }
+
+    protected static int getDenominator(final String token) {
+        int result = 0;
+        int index = token.indexOf('/');
+        String str = token.substring(index + 1);
+        result = Integer.parseInt(str);
+        return result;
+    }
+
+    protected static String getNumerator(final String token) {
+        int index = token.indexOf('/');
+        String str = token.substring(0, index);
+        return str;
+    }
+
+    static List<CronEntry> buildCronEntries(List<String> tokens) {
+        List<CronEntry> result = new ArrayList<CronEntry>();
+        CronEntry minutes = new CronEntry("Minutes", tokens.get(MINUTES), 0, 59);
+        result.add(minutes);
+        CronEntry hours = new CronEntry("Hours", tokens.get(HOURS), 0, 23);
+        result.add(hours);
+        CronEntry dayOfMonth = new CronEntry("DayOfMonth", tokens.get(DAY_OF_MONTH), 1, 31);
+        result.add(dayOfMonth);
+        CronEntry month = new CronEntry("Month", tokens.get(MONTH), 1, 12);
+        result.add(month);
+        CronEntry dayOfWeek = new CronEntry("DayOfWeek", tokens.get(DAY_OF_WEEK), 0, 6);
+        result.add(dayOfWeek);
+        return result;
+    }
+
+    static class CronEntry {
+        final String name;
+        final String token;
+        final int start;
+        final int end;
+        CronEntry(String name, String token, int start, int end) {
+            this.name = name;
+            this.token = token;
+            this.start = start;
+            this.end = end;
+        }
+        @Override
+        public String toString() {
+            return this.name + ":" + token;
+        }
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/CronParser.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/CronParser.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java?rev=908867&r1=908866&r2=908867&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java Thu Feb 11 08:18:36 2010
@@ -35,6 +35,10 @@
     public abstract long getStart();
 
     /**
+     * @return the Delay
+     */
+    public abstract long getDelay();
+    /**
      * @return the period
      */
     public abstract long getPeriod();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java?rev=908867&r1=908866&r2=908867&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java Thu Feb 11 08:18:36 2010
@@ -23,17 +23,17 @@
 
 
 public class JobImpl implements Job {
-    private final JobLocation location;
+    private final JobLocation jobLocation;
     private final byte[] payload;
     
     protected JobImpl(JobLocation location,ByteSequence bs) {
-        this.location=location;
+        this.jobLocation=location;
         this.payload = new byte[bs.getLength()];
         System.arraycopy(bs.getData(), bs.getOffset(), this.payload, 0, bs.getLength());
     }
 
     public String getJobId() {
-        return this.location.getJobId();
+        return this.jobLocation.getJobId();
     }
 
     public byte[] getPayload() {
@@ -41,26 +41,29 @@
     }
 
     public long getPeriod() {
-       return this.location.getPeriod();
+       return this.jobLocation.getPeriod();
     }
 
     public int getRepeat() {
-       return this.location.getRepeat();
+       return this.jobLocation.getRepeat();
     }
 
     public long getStart() {
-       return this.location.getStart();
+       return this.jobLocation.getStartTime();
+    }
+    
+    public long getDelay() {
+        return this.jobLocation.getDelay();
     }
 
     public String getCronEntry() {
-        return this.location.getCronEntry();
+        return this.jobLocation.getCronEntry();
     }
     
     
 
     public String getNextExecutionTime() {
-        // TODO Auto-generated method stub
-        return null;
+        return JobImpl.getDateTime(this.jobLocation.getNextTime());
     }
 
     public String getStartTime() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java?rev=908867&r1=908866&r2=908867&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java Thu Feb 11 08:18:36 2010
@@ -28,7 +28,9 @@
    
     private String jobId;
     private int repeat;
-    private long start;
+    private long startTime;
+    private long delay;
+    private long nextTime;
     private long period;
     private String cronEntry;
     private final Location location;
@@ -41,11 +43,13 @@
     public JobLocation() {
         this(new Location());
     }
-
+   
     public void readExternal(DataInput in) throws IOException {
         this.jobId = in.readUTF();
         this.repeat = in.readInt();
-        this.start = in.readLong();
+        this.startTime = in.readLong();
+        this.delay = in.readLong();
+        this.nextTime = in.readLong();
         this.period = in.readLong();
         this.cronEntry=in.readUTF();
         this.location.readExternal(in);
@@ -54,7 +58,9 @@
     public void writeExternal(DataOutput out) throws IOException {
         out.writeUTF(this.jobId);
         out.writeInt(this.repeat);
-        out.writeLong(this.start);
+        out.writeLong(this.startTime);
+        out.writeLong(this.delay);
+        out.writeLong(this.nextTime);
         out.writeLong(this.period);
         if (this.cronEntry==null) {
             this.cronEntry="";
@@ -97,16 +103,30 @@
     /**
      * @return the start
      */
-    public long getStart() {
-        return this.start;
+    public long getStartTime() {
+        return this.startTime;
     }
 
     /**
      * @param start
      *            the start to set
      */
-    public void setStart(long start) {
-        this.start = start;
+    public void setStartTime(long start) {
+        this.startTime = start;
+    }
+    
+    /**
+     * @return the nextTime
+     */
+    public synchronized long getNextTime() {
+        return this.nextTime;
+    }
+
+    /**
+     * @param nextTime the nextTime to set
+     */
+    public synchronized void setNextTime(long nextTime) {
+        this.nextTime = nextTime;
     }
 
     /**
@@ -137,6 +157,24 @@
     public synchronized void setCronEntry(String cronEntry) {
         this.cronEntry = cronEntry;
     }
+    
+    public boolean isCron() {
+        return getCronEntry() != null && getCronEntry().length() > 0;
+    }
+    
+    /**
+     * @return the delay
+     */
+    public long getDelay() {
+        return this.delay;
+    }
+
+    /**
+     * @param delay the delay to set
+     */
+    public void setDelay(long delay) {
+        this.delay = delay;
+    }
 
     /**
      * @return the location

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java?rev=908867&r1=908866&r2=908867&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java Thu Feb 11 08:18:36 2010
@@ -48,17 +48,27 @@
      */
     public abstract void schedule(String jobId, ByteSequence payload,long delay) throws Exception;
 
+    /**
+     * Add a job to be scheduled
+     * @param jobId a unique identifier for the job
+     * @param payload the message to be sent when the job is scheduled
+     * @param cronEntry - cron entry
+     * @throws Exception
+     */
+    public abstract void schedule(String jobId, ByteSequence payload,String cronEntry) throws Exception;
+
     
     /**
      * Add a job to be scheduled
      * @param jobId a unique identifier for the job
      * @param payload the message to be sent when the job is scheduled
-     * @param start 
+     * @param cronEntry - cron entry
+     * @param delay time in ms to wait before scheduling
      * @param period the time in milliseconds between successive executions of the Job
      * @param repeat the number of times to execute the job - less than 0 will be repeated forever
      * @throws Exception
      */
-    public abstract void schedule(String jobId, ByteSequence payload,long start, long period, int repeat) throws Exception;
+    public abstract void schedule(String jobId, ByteSequence payload,String cronEntry,long delay, long period, int repeat) throws Exception;
 
     /**
      * remove all jobs scheduled to run at this time

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java?rev=908867&r1=908866&r2=908867&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java Thu Feb 11 08:18:36 2010
@@ -119,10 +119,17 @@
         }
     }
 
-    public void schedule(String jobId, ByteSequence payload, long start, long period, int repeat) throws Exception {
+    public void schedule(String jobId, ByteSequence payload,String cronEntry, long start, long period, int repeat) throws Exception {
         JobScheduler js = this.broker.getInternalScheduler();
         if (js !=null) {
-            js.schedule(jobId, payload, start,period,repeat);
+            js.schedule(jobId, payload, cronEntry,start,period,repeat);
         }
     }
+    public void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception {
+        JobScheduler js = this.broker.getInternalScheduler();
+        if (js !=null) {
+            js.schedule(jobId, payload, cronEntry);
+        }
+        
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java?rev=908867&r1=908866&r2=908867&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java Thu Feb 11 08:18:36 2010
@@ -25,6 +25,8 @@
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.MessageFormatException;
+import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
 import org.apache.commons.logging.Log;
@@ -44,6 +46,8 @@
     BTreeIndex<Long, List<JobLocation>> index;
     private Thread thread;
     private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
+    private static final IdGenerator ID_GENERATOR = new IdGenerator();
+    private final ScheduleTime scheduleTime = new ScheduleTime();
 
     JobSchedulerImpl(JobSchedulerStore store) {
 
@@ -85,16 +89,25 @@
     public void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException {
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
             public void execute(Transaction tx) throws IOException {
-                schedule(tx, jobId, payload, 0, delay, 0);
+                schedule(tx, jobId, payload, "", 0, delay, 0);
             }
         });
     }
 
-    public void schedule(final String jobId, final ByteSequence payload, final long start, final long period,
-            final int repeat) throws IOException {
+    public void schedule(final String jobId, final ByteSequence payload, final String cronEntry) throws Exception {
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
             public void execute(Transaction tx) throws IOException {
-                schedule(tx, jobId, payload, start, period, repeat);
+                schedule(tx, jobId, payload, cronEntry, 0, 0, 0);
+            }
+        });
+
+    }
+
+    public void schedule(final String jobId, final ByteSequence payload, final String cronEntry, final long delay,
+            final long period, final int repeat) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                schedule(tx, jobId, payload, cronEntry, delay, period, repeat);
             }
         });
 
@@ -112,6 +125,14 @@
         });
     }
 
+    synchronized void removeFromIndex(final long time, final String jobId) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                removeFromIndex(tx, time, jobId);
+            }
+        });
+    }
+
     /*
      * (non-Javadoc)
      * @see org.apache.activemq.beanstalk.JobScheduler#remove(long,
@@ -154,7 +175,7 @@
                 Map.Entry<Long, List<JobLocation>> first = index.getFirst(store.getPageFile().tx());
                 if (first != null) {
                     for (JobLocation jl : first.getValue()) {
-                        ByteSequence bs = getJob(jl.getLocation());
+                        ByteSequence bs = getPayload(jl.getLocation());
                         Job job = new JobImpl(jl, bs);
                         result.add(job);
                     }
@@ -173,7 +194,7 @@
                     Map.Entry<Long, List<JobLocation>> next = iter.next();
                     if (next != null) {
                         for (JobLocation jl : next.getValue()) {
-                            ByteSequence bs = getJob(jl.getLocation());
+                            ByteSequence bs = getPayload(jl.getLocation());
                             Job job = new JobImpl(jl, bs);
                             result.add(job);
                         }
@@ -196,7 +217,7 @@
                     Map.Entry<Long, List<JobLocation>> next = iter.next();
                     if (next != null && next.getKey().longValue() <= finish) {
                         for (JobLocation jl : next.getValue()) {
-                            ByteSequence bs = getJob(jl.getLocation());
+                            ByteSequence bs = getPayload(jl.getLocation());
                             Job job = new JobImpl(jl, bs);
                             result.add(job);
                         }
@@ -221,46 +242,85 @@
     public synchronized void removeAllJobs(final long start, final long finish) throws IOException {
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
             public void execute(Transaction tx) throws IOException {
-                destroy(tx,start,finish);
+                destroy(tx, start, finish);
             }
         });
 
     }
 
-    ByteSequence getJob(Location location) throws IllegalStateException, IOException {
-        return this.store.getJob(location);
+    ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
+        return this.store.getPayload(location);
     }
 
-    void schedule(Transaction tx, String jobId, ByteSequence payload, long start, long period, int repeat)
-            throws IOException {
-        List<JobLocation> values = null;
-        long startTime;
-        long time;
-        if (start > 0) {
-            time = startTime = start;
-        } else {
-            startTime = System.currentTimeMillis();
-            time = startTime + period;
+    void schedule(Transaction tx, String jobId, ByteSequence payload, String cronEntry, long delay, long period,
+            int repeat) throws IOException {
+        long startTime = System.currentTimeMillis();
+        // round startTime - so we can schedule more jobs
+        // at the same time
+        startTime = (startTime / 1000) * 1000;
+        long time = 0;
+        if (cronEntry != null && cronEntry.length() > 0) {
+            try {
+                time = CronParser.getNextScheduledTime(cronEntry, startTime);
+            } catch (MessageFormatException e) {
+                throw new IOException(e.getMessage());
+            }
         }
-        if (this.index.containsKey(tx, time)) {
-            values = this.index.remove(tx, time);
+
+        if (time == 0) {
+            // start time not set by CRON - so it it to the current time
+            time = startTime;
         }
-        if (values == null) {
-            values = new ArrayList<JobLocation>();
+        if (delay > 0) {
+            time += delay;
+        } else {
+            time += period;
         }
 
         Location location = this.store.write(payload, false);
         JobLocation jobLocation = new JobLocation(location);
+        this.store.incrementJournalCount(tx, location);
         jobLocation.setJobId(jobId);
+        jobLocation.setStartTime(startTime);
+        jobLocation.setCronEntry(cronEntry);
+        jobLocation.setDelay(delay);
         jobLocation.setPeriod(period);
         jobLocation.setRepeat(repeat);
+        storeJob(tx, jobLocation, time);
+        this.scheduleTime.newJob();
+    }
+
+    synchronized void storeJob(final JobLocation jobLocation, final long nextExecutionTime) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                storeJob(tx, jobLocation, nextExecutionTime);
+            }
+        });
+    }
+
+    void storeJob(final Transaction tx, final JobLocation jobLocation, final long nextExecutionTime) throws IOException {
+        List<JobLocation> values = null;
+        jobLocation.setNextTime(nextExecutionTime);
+        if (this.index.containsKey(tx, nextExecutionTime)) {
+            values = this.index.remove(tx, nextExecutionTime);
+        }
+        if (values == null) {
+            values = new ArrayList<JobLocation>();
+        }
         values.add(jobLocation);
-        this.index.put(tx, time, values);
-        this.store.incrementJournalCount(tx, location);
-        poke();
+        this.index.put(tx, nextExecutionTime, values);
+
     }
 
     void remove(Transaction tx, long time, String jobId) throws IOException {
+        JobLocation result = removeFromIndex(tx, time, jobId);
+        if (result != null) {
+            this.store.decrementJournalCount(tx, result.getLocation());
+        }
+    }
+
+    JobLocation removeFromIndex(Transaction tx, long time, String jobId) throws IOException {
+        JobLocation result = null;
         List<JobLocation> values = this.index.remove(tx, time);
         if (values != null) {
             for (int i = 0; i < values.size(); i++) {
@@ -270,11 +330,12 @@
                     if (!values.isEmpty()) {
                         this.index.put(tx, time, values);
                     }
-                    this.store.decrementJournalCount(tx, jl.getLocation());
+                    result = jl;
                     break;
                 }
             }
         }
+        return result;
     }
 
     void remove(Transaction tx, long time) throws IOException {
@@ -339,7 +400,7 @@
         }
     }
 
-    synchronized Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws IOException {
+    private synchronized Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws IOException {
         if (!this.store.isStopped() && !this.store.isStopping()) {
             Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
             return first;
@@ -348,12 +409,10 @@
 
     }
 
-    void fireJobs(List<JobLocation> list) throws IllegalStateException, IOException {
-        for (JobLocation jl : list) {
-            ByteSequence bs = this.store.getJob(jl.getLocation());
-            for (JobListener l : jobListeners) {
-                l.scheduledJob(jl.getJobId(), bs);
-            }
+    void fireJob(JobLocation job) throws IllegalStateException, IOException {
+        ByteSequence bs = this.store.getPayload(job.getLocation());
+        for (JobListener l : jobListeners) {
+            l.scheduledJob(job.getJobId(), bs);
         }
     }
 
@@ -382,6 +441,7 @@
 
     protected void mainLoop() {
         while (this.running.get()) {
+            this.scheduleTime.clearNewJob();
             try {
                 // peek the next job
                 long currentTime = System.currentTimeMillis();
@@ -389,35 +449,70 @@
                 Map.Entry<Long, List<JobLocation>> first = getNextToSchedule();
                 if (first != null) {
                     List<JobLocation> list = new ArrayList<JobLocation>(first.getValue());
-                    long executionTime = first.getKey();
+                    final long executionTime = first.getKey();
+                    long nextExecutionTime = 0;
                     if (executionTime <= currentTime) {
-                        fireJobs(list);
-                        for (JobLocation jl : list) {
-                            int repeat = jl.getRepeat();
-                            if (repeat != 0) {
-                                repeat--;
-                                ByteSequence payload = this.store.getJob(jl.getLocation());
-                                String jobId = jl.getJobId();
-                                long period = jl.getPeriod();
-                                schedule(jobId, payload, 0, period, repeat);
+
+                        for (final JobLocation job : list) {
+                            int repeat = job.getRepeat();
+                            nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat);
+                            long waitTime = nextExecutionTime - currentTime;
+                            this.scheduleTime.setWaitTime(waitTime);
+                            if (job.isCron() == false) {
+                                fireJob(job);
+                                if (repeat != 0) {
+                                    repeat--;
+                                    job.setRepeat(repeat);
+                                    // remove this job from the index - so it
+                                    // doesn't get destroyed
+                                    removeFromIndex(executionTime, job.getJobId());
+                                    // and re-store it
+                                    storeJob(job, nextExecutionTime);
+                                }
+                            } else {
+                                // cron job
+                                if (repeat == 0) {
+                                    // we haven't got a separate scheduler to
+                                    // execute at
+                                    // this time - just a cron job - so fire it
+                                    fireJob(job);
+                                }
+                                if (nextExecutionTime > currentTime) {
+                                    // we will run again ...
+                                    // remove this job from the index - so it
+                                    // doesn't get destroyed
+                                    removeFromIndex(executionTime, job.getJobId());
+                                    // and re-store it
+                                    storeJob(job, nextExecutionTime);
+                                    if (repeat != 0) {
+                                        // we have a separate schedule to run at
+                                        // this time
+                                        // so the cron job is used to set of a
+                                        // seperate scheule
+                                        // hence we won't fire the original cron
+                                        // job to the listeners
+                                        // but we do need to start a separate
+                                        // schedule
+                                        String jobId = ID_GENERATOR.generateId();
+                                        ByteSequence payload = getPayload(job.getLocation());
+                                        schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat());
+                                        waitTime = job.getDelay() != 0 ? job.getDelay() : job.getPeriod();
+                                        this.scheduleTime.setWaitTime(waitTime);
+                                    }
+                                }
                             }
                         }
-                        // now remove jobs from this execution time
+                        // now remove all jobs that have not been
+                        // rescheduled from this execution time
                         remove(executionTime);
                     } else {
-                        long waitTime = executionTime - currentTime;
-                        synchronized (this.running) {
-                            this.running.wait(waitTime);
-                        }
-                    }
-                } else {
-                    synchronized (this.running) {
-                        this.running.wait(250);
+                        this.scheduleTime.setWaitTime(executionTime - currentTime);
                     }
                 }
 
-            } catch (InterruptedException e) {
-            } catch (IOException ioe) {
+                this.scheduleTime.pause();
+
+            } catch (Exception ioe) {
                 LOG.error(this.name + " Failed to schedule job", ioe);
                 try {
                     this.store.stop();
@@ -440,7 +535,7 @@
     @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
         this.running.set(false);
-        poke();
+        this.scheduleTime.wakeup();
         Thread t = this.thread;
         if (t != null) {
             t.join(1000);
@@ -448,10 +543,15 @@
 
     }
 
-    protected void poke() {
-        synchronized (this.running) {
-            this.running.notifyAll();
+    long calculateNextExecutionTime(final JobLocation job, long currentTime, int repeat) throws MessageFormatException {
+        long result = currentTime;
+        String cron = job.getCronEntry();
+        if (cron != null && cron.length() > 0) {
+            result = CronParser.getNextScheduledTime(cron, result);
+        } else if (job.getRepeat() != 0) {
+            result += job.getPeriod();
         }
+        return result;
     }
 
     void createIndexes(Transaction tx) throws IOException {
@@ -496,4 +596,54 @@
             }
         }
     }
+
+    static class ScheduleTime {
+        private final int DEFAULT_WAIT = 500;
+        private final int DEFAULT_NEW_JOB_WAIT = 100;
+        private boolean newJob;
+        private long waitTime = DEFAULT_WAIT;
+        private final Object mutex = new Object();
+
+        /**
+         * @return the waitTime
+         */
+        long getWaitTime() {
+            return this.waitTime;
+        }
+        /**
+         * @param waitTime
+         *            the waitTime to set
+         */
+        void setWaitTime(long waitTime) {
+            if (!this.newJob) {
+                this.waitTime = waitTime > 0 ? waitTime : DEFAULT_WAIT;
+            }
+        }
+
+        void pause() {
+            synchronized (mutex) {
+                try {
+                    mutex.wait(this.waitTime);
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+
+        void newJob() {
+            this.newJob = true;
+            this.waitTime = DEFAULT_NEW_JOB_WAIT;
+            wakeup();
+        }
+
+        void clearNewJob() {
+            this.newJob = false;
+        }
+
+        void wakeup() {
+            synchronized (this.mutex) {
+                mutex.notifyAll();
+            }
+        }
+
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java?rev=908867&r1=908866&r2=908867&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java Thu Feb 11 08:18:36 2010
@@ -309,7 +309,7 @@
 
     }
 
-    synchronized ByteSequence getJob(Location location) throws IllegalStateException, IOException {
+    synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
         ByteSequence result = null;
         result = this.journal.read(location);
         return result;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=908867&r1=908866&r2=908867&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java Thu Feb 11 08:18:36 2010
@@ -60,11 +60,11 @@
         LOG.info("Scheduler using directory: " + directory);
 
     }
-   
-    public synchronized  JobScheduler getJobScheduler() throws Exception {
+
+    public synchronized JobScheduler getJobScheduler() throws Exception {
         return new JobSchedulerFacade(this);
     }
-   
+
     /**
      * @return the directory
      */
@@ -102,25 +102,33 @@
 
     @Override
     public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
-        long start = 0;
+        long delay = 0;
         long period = 0;
         int repeat = 0;
-
+        String cronEntry = "";
+        Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
         Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
 
-        if (periodValue != null) {
-            period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
-            Object startValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_START);
-            if (startValue != null) {
-                start = (Long) TypeConversionSupport.convert(startValue, Long.class);
-            }
-            Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
-            if (repeatValue != null) {
-                repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
-            }
+        if (cronValue != null || periodValue != null) {
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(messageSend);
-            getInternalScheduler().schedule( messageSend.getMessageId().toString(),
-                    new ByteSequence(packet.data, packet.offset, packet.length),start, period, repeat);
+                if (cronValue != null) {
+                    cronEntry = cronValue.toString();
+                }
+                if (periodValue != null) {      
+                  period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
+                }
+                Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
+                if (delayValue != null) {
+                    delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
+                }
+                Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
+                if (repeatValue != null) {
+                    repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
+                }
+                
+                getInternalScheduler().schedule(messageSend.getMessageId().toString(),
+                        new ByteSequence(packet.data, packet.offset, packet.length),cronEntry, delay, period, repeat);
+            
 
         } else {
 
@@ -135,22 +143,29 @@
             Message messageSend = (Message) this.wireFormat.unmarshal(packet);
             messageSend.setOriginalTransactionId(null);
             Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
+            Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
+            String cronStr = cronValue != null ? cronValue.toString() : null;
+            int repeat = 0;
             if (repeatValue != null) {
-                int repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
-                if (repeat != 0) {
-                    //create a unique id - the original message could be sent lots of times
-                    messageSend.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
+                repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
+            }
+  
+                if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
+                    // create a unique id - the original message could be sent
+                    // lots of times
+                    messageSend
+                            .setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
                 }
-            }   
-            //Add the jobId as a property
+            
+            // Add the jobId as a property
             messageSend.setProperty("scheduledJobId", id);
-           
-            //if this goes across a network - we don't want it rescheduled
+
+            // if this goes across a network - we don't want it rescheduled
             messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
-            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_START);
+            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
             messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
-            
-            
+            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
+
             final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
             producerExchange.setConnectionContext(context);
             producerExchange.setMutable(true);
@@ -161,8 +176,8 @@
         }
 
     }
-    
-    protected synchronized  JobScheduler getInternalScheduler() throws Exception {
+
+    protected synchronized JobScheduler getInternalScheduler() throws Exception {
         if (this.started.get()) {
             if (this.scheduler == null) {
                 this.scheduler = getStore().getJobScheduler("JMS");
@@ -173,8 +188,6 @@
         return null;
     }
 
-    
-
     private JobSchedulerStore getStore() throws Exception {
         if (started.get()) {
             if (this.store == null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java?rev=908867&r1=908866&r2=908867&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java Thu Feb 11 08:18:36 2010
@@ -31,6 +31,7 @@
 import javax.jms.MessageNotWriteableException;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.scheduler.CronParser;
 import org.apache.activemq.filter.PropertyExpression;
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.util.Callback;
@@ -413,6 +414,7 @@
         }
 
         checkValidObject(value);
+        checkValidScheduled(name, value);
         PropertySetter setter = JMS_PROPERTY_SETERS.get(name);
 
         if (setter != null && value != null) {
@@ -454,6 +456,17 @@
             }
         }
     }
+    
+    protected void  checkValidScheduled(String name, Object value) throws MessageFormatException {
+        if (AMQ_SCHEDULED_DELAY.equals(name) || AMQ_SCHEDULED_PERIOD.equals(name) || AMQ_SCHEDULED_REPEAT.equals(name)) {
+            if (value instanceof Long == false && value instanceof Integer == false) {
+                throw new MessageFormatException(name + " should be long or int value");
+            }
+        }
+        if (AMQ_SCHEDULED_CRON.equals(name)) {
+            CronParser.validate(value.toString());
+        }
+    }
 
     public Object getObjectProperty(String name) throws JMSException {
         if (name == null) {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/CronParserTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/CronParserTest.java?rev=908867&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/CronParserTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/CronParserTest.java Thu Feb 11 08:18:36 2010
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import java.util.List;
+import javax.jms.MessageFormatException;
+import org.junit.Test;
+
+public class CronParserTest {
+
+    @Test
+    public void testgetNextTimeMinutes() throws MessageFormatException {
+        String test = "30 * * * *";
+        long current = 20*60*1000;
+        long next = CronParser.getNextScheduledTime(test, current);
+        long result = next - current;
+        assertEquals(60*10*1000,result);
+    }
+    
+    @Test
+    public void testgetNextTimeHours() throws MessageFormatException {
+        String test = "* 1 * * *";
+        long current = 60*1000*60*5;
+        long next = CronParser.getNextScheduledTime(test, current);
+        long result = next - current;
+        assertEquals(60*1000*60*18,result);
+    }
+    
+    @Test
+    public void testValidate() {
+        try {
+            CronParser.validate("30 08 10 06 * ");
+            CronParser.validate("* * * * * ");
+            CronParser.validate("* * * * 1-6 ");
+            CronParser.validate("* * * * 1,2,5 ");
+            CronParser.validate("*/10 0-4,8-12 * * 1-2,3-6/2 ");
+
+        } catch (Exception e) {
+            fail("Should be valid ");
+        }
+
+        try {
+            CronParser.validate("61 08 10 06 * ");
+            fail("Should not be valid ");
+        } catch (Exception e) {
+        }
+        try {
+            CronParser.validate("61 08 06 * ");
+            fail("Should not be valid ");
+        } catch (Exception e) {
+        }
+    }
+
+    @Test
+    public void testGetNextCommaSeparated() throws MessageFormatException {
+        String token = "3,5,7";
+        // test minimum values
+        int next = CronParser.getNext(createEntry(token, 1, 10), 3);
+        assertEquals(2, next);
+        next = CronParser.getNext(createEntry(token, 1, 10), 8);
+        assertEquals(4, next);
+        next = CronParser.getNext(createEntry(token, 1, 10), 1);
+        assertEquals(2, next);
+    }
+
+    @Test
+    public void testGetNextRange() throws MessageFormatException {
+        String token = "3-5";
+        // test minimum values
+        int next = CronParser.getNext(createEntry(token, 1, 10), 3);
+        assertEquals(1, next);
+        next = CronParser.getNext(createEntry(token, 1, 10), 5);
+        assertEquals(7, next);
+        next = CronParser.getNext(createEntry(token, 1, 10), 6);
+        assertEquals(6, next);
+        next = CronParser.getNext(createEntry(token, 1, 10), 1);
+        assertEquals(2, next);
+    }
+
+    @Test
+    public void testGetNextExact() throws MessageFormatException {
+        String token = "3";
+        int next = CronParser.getNext(createEntry(token, 0, 10), 2);
+        assertEquals(1, next);
+        next = CronParser.getNext(createEntry(token, 0, 10), 3);
+        assertEquals(10, next);
+        next = CronParser.getNext(createEntry(token, 0, 10), 1);
+        assertEquals(2, next);
+    }
+
+    @Test
+    public void testTokenize() {
+        String test = "*/5 * * * *";
+        List<String> list = CronParser.tokenize(test);
+        assertEquals(list.size(), 5);
+
+        test = "*/5 * * * * *";
+        try {
+            list = CronParser.tokenize(test);
+            fail("Should have throw an exception");
+        } catch (Throwable e) {
+        }
+
+        test = "*/5 * * * *";
+        try {
+            list = CronParser.tokenize(test);
+            fail("Should have throw an exception");
+        } catch (Throwable e) {
+        }
+
+    }
+
+    public void testGetNextScheduledTime() {
+        fail("Not yet implemented");
+    }
+
+    CronParser.CronEntry createEntry(String str, int start, int end) {
+        return new CronParser.CronEntry("test", str, start, end);
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/CronParserTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/CronParserTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java?rev=908867&r1=908866&r2=908867&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java Thu Feb 11 08:18:36 2010
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.scheduler;
 
+import java.io.File;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -29,10 +31,48 @@
 import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.ScheduledMessage;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
 
 public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
 
-    
+    public void testCron() throws Exception {
+        final int COUNT = 10;
+        final AtomicInteger count = new AtomicInteger();
+        Connection connection = createConnection();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                latch.countDown();
+                count.incrementAndGet();
+            }
+        });
+
+        connection.start();
+        MessageProducer producer = session.createProducer(destination);
+        TextMessage message = session.createTextMessage("test msg");
+        long time = 1000;
+        message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *");
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500);
+        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, COUNT - 1);
+
+        producer.send(message);
+        producer.close();
+
+        Thread.sleep(500);
+        SchedulerBroker sb = (SchedulerBroker) this.broker.getBroker().getAdaptor(SchedulerBroker.class);
+        JobScheduler js = sb.getJobScheduler();
+        List<Job> list = js.getAllJobs();
+        assertEquals(1, list.size());
+        latch.await(2,TimeUnit.MINUTES);
+        assertEquals(COUNT,count.get());
+    }
+
     public void testSchedule() throws Exception {
         final int COUNT = 1;
         Connection connection = createConnection();
@@ -49,15 +89,15 @@
         });
 
         connection.start();
-        long time =  5000;
+        long time = 5000;
         MessageProducer producer = session.createProducer(destination);
         TextMessage message = session.createTextMessage("test msg");
-        
+
         message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, time);
-      
+
         producer.send(message);
         producer.close();
-        //make sure the message isn't delivered early
+        // make sure the message isn't delivered early
         Thread.sleep(2000);
         assertEquals(latch.getCount(), COUNT);
         latch.await(5, TimeUnit.SECONDS);
@@ -84,16 +124,16 @@
         connection.start();
         MessageProducer producer = session.createProducer(destination);
         TextMessage message = session.createTextMessage("test msg");
-        long time = System.currentTimeMillis() + 4000;
-        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_START, time);
-        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 50);
-        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, NUMBER-1);
+        long time = 1000;
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500);
+        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, NUMBER - 1);
         producer.send(message);
         producer.close();
         assertEquals(latch.getCount(), NUMBER);
-        latch.await(5, TimeUnit.SECONDS);
-        assertEquals(latch.getCount(), 0);
-        //wait a little longer - make sure we only get NUMBER of replays
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(0, latch.getCount());
+        // wait a little longer - make sure we only get NUMBER of replays
         Thread.sleep(1000);
         assertEquals(NUMBER, count.get());
     }
@@ -103,12 +143,17 @@
         bindAddress = "vm://localhost";
         super.setUp();
     }
-    
+
     @Override
     protected BrokerService createBroker() throws Exception {
+        File schedulerDirectory = new File("target/scheduler");
+        IOHelper.mkdirs(schedulerDirectory);
+        IOHelper.deleteChildren(schedulerDirectory);
         BrokerService answer = new BrokerService();
         answer.setPersistent(isPersistent());
+        answer.setDeleteAllMessagesOnStartup(true);
         answer.setDataDirectory("target");
+        answer.setSchedulerDirectoryFile(schedulerDirectory);
         answer.setUseJmx(false);
         answer.addConnector(bindAddress);
         return answer;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java?rev=908867&r1=908866&r2=908867&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java Thu Feb 11 08:18:36 2010
@@ -40,17 +40,17 @@
         }
 		JobScheduler js = store.getJobScheduler("test");
 		int count = 0;
-		long startTime = System.currentTimeMillis()+10000;
+		long startTime = 10000;
 		for (ByteSequence job:list) {
-		    js.schedule("id:"+(count++), job,startTime,10000,-1);	    
+		    js.schedule("id:"+(count++), job,"",startTime,10000,-1);	    
 		}
-		List<Job>test = js.getNextScheduleJobs();
+		List<Job>test = js.getAllJobs();
 		assertEquals(list.size(),test.size());
 		store.stop();
 		
 		store.start();
 		js = store.getJobScheduler("test");
-		test = js.getNextScheduleJobs();
+		test = js.getAllJobs();
 		assertEquals(list.size(),test.size());
 		for (int i = 0; i < list.size();i++) {
 		    String orig = new String(list.get(i).getData());

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java?rev=908867&r1=908866&r2=908867&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java Thu Feb 11 08:18:36 2010
@@ -46,10 +46,10 @@
         });
         for (int i = 0; i < COUNT; i++) {
             String test = new String("test" + i);
-            scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), 10);
+            scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), 1000);
         }
-        latch.await(1, TimeUnit.SECONDS);
-        assertTrue(latch.getCount() == 0);
+        latch.await(5, TimeUnit.SECONDS);
+        assertEquals(0,latch.getCount());
     }
 
     @Test
@@ -63,10 +63,10 @@
             }
 
         });
-        long time = System.currentTimeMillis() + 2000;
+        long time = 2000;
         for (int i = 0; i < COUNT; i++) {
             String test = new String("test" + i);
-            scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), time, 10, -1);
+            scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), "", time, 10, -1);
         }
         assertTrue(latch.getCount() == COUNT);
         latch.await(3000, TimeUnit.SECONDS);
@@ -77,10 +77,10 @@
     public void testAddStopThenDeliver() throws Exception {
         final int COUNT = 10;
         final CountDownLatch latch = new CountDownLatch(COUNT);
-        long time = System.currentTimeMillis() + 2000;
+        long time =  2000;
         for (int i = 0; i < COUNT; i++) {
             String test = new String("test" + i);
-            scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), time, 10, -1);
+            scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), "", time, 1000, -1);
         }
         File directory = store.getDirectory();
         tearDown();
@@ -101,10 +101,10 @@
     public void testRemoveLong() throws Exception {
         final int COUNT = 10;
 
-        long time = System.currentTimeMillis() + 20000;
+        long time = 20000;
         for (int i = 0; i < COUNT; i++) {
             String str = new String("test" + i);
-            scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), time, 10, -1);
+            scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), "", time, 1000, -1);
 
         }
         int size = scheduler.getNextScheduleJobs().size();
@@ -119,12 +119,12 @@
     public void testRemoveString() throws Exception {
         final int COUNT = 10;
         final String test = "TESTREMOVE";
-        long time = System.currentTimeMillis() + 20000;
+        long time = 20000;
         for (int i = 0; i < COUNT; i++) {
             String str = new String("test" + i);
-            scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), time, 10, -1);
+            scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), "", time, 10, -1);
             if (i == COUNT / 2) {
-                scheduler.schedule(test, new ByteSequence(test.getBytes()), time, 10, -1);
+                scheduler.schedule(test, new ByteSequence(test.getBytes()), "", time, 10, -1);
             }
         }
 
@@ -139,11 +139,10 @@
     public void testgetAllJobs() throws Exception {
         final int COUNT = 10;
         final String ID = "id:";
-        final String test = "TEST";
-        long time = System.currentTimeMillis() + 20000;
+        long time =  20000;
         for (int i = 0; i < COUNT; i++) {
             String str = new String("test" + i);
-            scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), time, 10 + i, -1);
+            scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), "", time, 10 + i, -1);
         }
         List<Job> list = scheduler.getAllJobs();
 
@@ -160,22 +159,19 @@
     public void testgetAllJobsInRange() throws Exception {
         final int COUNT = 10;
         final String ID = "id:";
-        final String test = "TEST";
-        long start = System.currentTimeMillis() + 10000;
+        long start = 10000;
 
-        long time = System.currentTimeMillis() + 20000;
         for (int i = 0; i < COUNT; i++) {
             String str = new String("test" + i);
-            if (i < (COUNT - 2)) {
-                scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), start + (i * 1000), 10000 + i, 0);
-            } else {
-                scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), start + start, 10000 + i, 0);
-            }
+           
+                scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), "", start + (i * 1000), 10000 + i, 0);
+           
         }
-        long finish = start + ((COUNT - 2) * 1000);
+        start = System.currentTimeMillis();
+        long finish = start + 12000+ (COUNT * 1000);
         List<Job> list = scheduler.getAllJobs(start, finish);
 
-        assertEquals(list.size(), COUNT - 2);
+        assertEquals( COUNT,list.size());
         int count = 0;
         for (Job job : list) {
 
@@ -188,27 +184,19 @@
     public void testRemoveAllJobsInRange() throws Exception {
         final int COUNT = 10;
         final String ID = "id:";
-        final String test = "TEST";
-        long start = System.currentTimeMillis() + 10000;
+        long start = 10000;
 
-        long time = System.currentTimeMillis() + 20000;
         for (int i = 0; i < COUNT; i++) {
             String str = new String("test" + i);
-            if (i < (COUNT - 2)) {
-                scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), start + (i * 1000), 10000 + i, 0);
-            } else {
-                scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), start + start, 10000 + i, 0);
-            }
+           
+                scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), "", start + (i * 1000), 10000 + i, 0);
+           
         }
-        long finish = start + ((COUNT - 2) * 1000);
+        start = System.currentTimeMillis();
+        long finish = start + 12000+ (COUNT * 1000);
         scheduler.removeAllJobs(start, finish);
-        List<Job> list = scheduler.getAllJobs();
-        assertEquals(list.size(), 2);
-        int count = COUNT - 2;
-        for (Job job : list) {
-            assertEquals(job.getJobId(), ID + count);
-            count++;
-        }
+
+        assertTrue(scheduler.getAllJobs().isEmpty());
     }
 
     @Before



Mime
View raw message