activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r907197 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/scheduler/ test/java/org/apache/activemq/bro...
Date Sat, 06 Feb 2010 08:57:24 GMT
Author: rajdavies
Date: Sat Feb  6 08:57:24 2010
New Revision: 907197

URL: http://svn.apache.org/viewvc?rev=907197&view=rev
Log:
Added JMX support for JobScheduling

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java?rev=907197&r1=907196&r2=907197&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java Sat Feb  6 08:57:24 2010
@@ -29,6 +29,10 @@
      * The number of times to repeat scheduling a message for delivery
      */
     public static final String AMQ_SCHEDULED_REPEAT = "AMQ_SCHEDULED_REPEAT";
+    /**
+     * Use a Cron tab entry to set the schedule
+     */
+    public static final String AMQ_SCHEDULED_CRON = "AMQ_SCHEDULED_CRON";
     
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=907197&r1=907196&r2=907197&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Sat Feb  6 08:57:24 2010
@@ -45,6 +45,8 @@
 import org.apache.activemq.broker.jmx.ConnectorViewMBean;
 import org.apache.activemq.broker.jmx.FTConnectorView;
 import org.apache.activemq.broker.jmx.JmsConnectorView;
+import org.apache.activemq.broker.jmx.JobSchedulerView;
+import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
 import org.apache.activemq.broker.jmx.ManagedRegionBroker;
 import org.apache.activemq.broker.jmx.ManagementContext;
 import org.apache.activemq.broker.jmx.NetworkConnectorView;
@@ -1764,7 +1766,23 @@
     protected Broker addInterceptors(Broker broker) throws Exception {
         broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
         if (isSchedulerSupport()) {
-            broker = new SchedulerBroker(broker,getSchedulerDirectoryFile());
+            SchedulerBroker sb = new SchedulerBroker(broker, getSchedulerDirectoryFile());
+            if (isUseJmx()) {
+                JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
+                try {
+                    ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":"
+                            + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
+                            + "Type=jobScheduler," + "jobSchedulerName=JMS");
+
+                    AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
+                    this.adminView.setJMSJobScheduler(objectName);
+                } catch (Throwable e) {
+                    throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
+                            + e.getMessage(), e);
+                }
+
+            }
+            broker = sb;
         }
         if (isAdvisorySupport()) {
             broker = new AdvisoryBroker(broker);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=907197&r1=907196&r2=907197&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Sat Feb  6 08:57:24 2010
@@ -44,6 +44,7 @@
     ManagedRegionBroker broker;
     private final BrokerService brokerService;
     private final AtomicInteger sessionIdCounter = new AtomicInteger(0);
+    private ObjectName jmsJobScheduler;
 
     public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception {
         this.brokerService = brokerService;
@@ -365,4 +366,12 @@
             return "";
         }
     }
+
+    public ObjectName getJMSJobScheduler() {
+        return this.jmsJobScheduler;
+    }
+    
+    public void setJMSJobScheduler(ObjectName name) {
+        this.jmsJobScheduler=name;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=907197&r1=907196&r2=907197&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Sat Feb  6 08:57:24 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.broker.jmx;
 
 import javax.management.ObjectName;
-
 import org.apache.activemq.Service;
 
 
@@ -244,4 +243,7 @@
     @MBeanInfo("The location of the data directory")
     public String getDataDirectory();
     
+    @MBeanInfo("JMSJobScheduler")
+    ObjectName getJMSJobScheduler();
+    
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java?rev=907197&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java Sat Feb  6 08:57:24 2010
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import java.io.IOException;
+import java.util.List;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
+import org.apache.activemq.broker.scheduler.Job;
+import org.apache.activemq.broker.scheduler.JobImpl;
+import org.apache.activemq.broker.scheduler.JobScheduler;
+
+public class JobSchedulerView implements JobSchedulerViewMBean {
+
+    private final JobScheduler jobScheduler;
+
+    public JobSchedulerView(JobScheduler jobScheduler) {
+        this.jobScheduler = jobScheduler;
+    }
+
+    public TabularData getAllJobs() throws Exception {
+        OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
+        CompositeType ct = factory.getCompositeType();
+        TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs", ct, new String[] { "jobId" });
+        TabularDataSupport rc = new TabularDataSupport(tt);
+        List<Job> jobs = this.jobScheduler.getAllJobs();
+        for (Job job : jobs) {
+            rc.put(new CompositeDataSupport(ct, factory.getFields(job)));
+        }
+        return rc;
+    }
+
+    public TabularData getAllJobs(String startTime, String finishTime) throws Exception {
+        OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
+        CompositeType ct = factory.getCompositeType();
+        TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs", ct, new String[] { "jobId" });
+        TabularDataSupport rc = new TabularDataSupport(tt);
+        long start = JobImpl.getDataTime(startTime);
+        long finish = JobImpl.getDataTime(finishTime);
+        List<Job> jobs = this.jobScheduler.getAllJobs(start, finish);
+        for (Job job : jobs) {
+            rc.put(new CompositeDataSupport(ct, factory.getFields(job)));
+        }
+        return rc;
+    }
+
+    public TabularData getNextScheduleJobs() throws Exception {
+        OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
+        CompositeType ct = factory.getCompositeType();
+        TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs", ct, new String[] { "jobId" });
+        TabularDataSupport rc = new TabularDataSupport(tt);
+        List<Job> jobs = this.jobScheduler.getNextScheduleJobs();
+        for (Job job : jobs) {
+            rc.put(new CompositeDataSupport(ct, factory.getFields(job)));
+        }
+        return rc;
+    }
+
+    public String getNextScheduleTime() throws Exception {
+        long time = this.jobScheduler.getNextScheduleTime();
+        return JobImpl.getDateTime(time);
+    }
+
+    public void removeAllJobs() throws Exception {
+        this.jobScheduler.removeAllJobs();
+
+    }
+
+    public void removeAllJobs(String startTime, String finishTime) throws Exception {
+        long start = JobImpl.getDataTime(startTime);
+        long finish = JobImpl.getDataTime(finishTime);
+        this.jobScheduler.removeAllJobs(start, finish);
+
+    }
+
+    public void removeJob(String jobId) throws Exception {
+        this.jobScheduler.remove(jobId);
+
+    }
+
+    public void removeJobAtScheduledTime(String time) throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java?rev=907197&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java Sat Feb  6 08:57:24 2010
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import javax.management.openmbean.TabularData;
+
+
+
+public interface JobSchedulerViewMBean {
+    /**
+     * remove all jobs scheduled to run at this time
+     * @param time
+     * @throws Exception
+     */
+    @MBeanInfo("remove jobs with matching execution time")
+    public abstract void removeJobAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd hh:mm:ss")String time) throws Exception;
+
+    /**
+     * remove a job with the matching jobId
+     * @param jobId
+     * @throws Exception
+     */
+    @MBeanInfo("remove jobs with matching jobId")
+    public abstract void removeJob(@MBeanInfo("jobId")String jobId) throws Exception;
+    
+    /**
+     * remove all the Jobs from the scheduler
+     * @throws Exception
+     */
+    @MBeanInfo("remove all scheduled jobs")
+    public abstract void removeAllJobs() throws Exception;
+    
+    /**
+     * remove all the Jobs from the scheduler that are due between the start and finish times
+     * @param start time 
+     * @param finish time
+     * @throws Exception
+     */
+    @MBeanInfo("remove all scheduled jobs between time ranges ")
+    public abstract void removeAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish) throws Exception;
+    
+
+    
+    /**
+     * Get the next time jobs will be fired
+     * @return the time in milliseconds
+     * @throws Exception 
+     */
+    @MBeanInfo("get the next time a job is due to be scheduled ")
+    public abstract String getNextScheduleTime() throws Exception;
+    
+    /**
+     * Get all the jobs scheduled to run next
+     * @return a list of jobs that will be scheduled next
+     * @throws Exception
+     */
+    @MBeanInfo("get the next job(s) to be scheduled. Not HTML friendly ")
+    public abstract TabularData getNextScheduleJobs() throws Exception;
+    
+    /**
+     * Get all the outstanding Jobs
+     * @return a  table of all jobs
+     * @throws Exception
+
+     */
+    @MBeanInfo("get the scheduled Jobs in the Store. Not HTML friendly ")
+    public abstract TabularData getAllJobs() throws Exception;
+    
+    /**
+     * Get all outstanding jobs due to run between start and finish
+     * @param start
+     * @param finish
+     * @return a table of jobs in the range
+     * @throws Exception
+
+     */
+    @MBeanInfo("get the scheduled Jobs in the Store within the time range. Not HTML friendly ")
+    public abstract TabularData getAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish)throws Exception;
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java?rev=907197&r1=907196&r2=907197&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java Sat Feb  6 08:57:24 2010
@@ -28,7 +28,6 @@
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import javax.management.Attribute;
 import javax.management.JMException;
 import javax.management.MBeanServer;
@@ -41,7 +40,6 @@
 import javax.management.remote.JMXConnectorServer;
 import javax.management.remote.JMXConnectorServerFactory;
 import javax.management.remote.JMXServiceURL;
-
 import org.apache.activemq.Service;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -69,12 +67,12 @@
     private int connectorPort = 1099;
     private int rmiServerPort;
     private String connectorPath = "/jmxrmi";
-    private AtomicBoolean started = new AtomicBoolean(false);
-    private AtomicBoolean connectorStarting = new AtomicBoolean(false);
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicBoolean connectorStarting = new AtomicBoolean(false);
     private JMXConnectorServer connectorServer;
     private ObjectName namingServiceObjectName;
     private Registry registry;
-    private List<ObjectName> registeredMBeanNames = new CopyOnWriteArrayList<ObjectName>();
+    private final List<ObjectName> registeredMBeanNames = new CopyOnWriteArrayList<ObjectName>();
 
     public ManagementContext() {
         this(null);
@@ -94,6 +92,7 @@
                 } catch (Throwable ignore) {
                 }
                 Thread t = new Thread("JMX connector") {
+                    @Override
                     public void run() {
                         try {
                             JMXConnectorServer server = connectorServer;
@@ -314,6 +313,10 @@
         return getMBeanServer().queryNames(name, query);
     }
     
+    public ObjectInstance getObjectInstance(ObjectName name) throws Exception {
+        return getMBeanServer().getObjectInstance(name);
+    }
+    
     /**
      * Unregister an MBean
      * 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java?rev=907197&r1=907196&r2=907197&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java Sat Feb  6 08:57:24 2010
@@ -16,6 +16,13 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import static org.apache.activemq.broker.jmx.CompositeDataConstants.BODY_LENGTH;
+import static org.apache.activemq.broker.jmx.CompositeDataConstants.BODY_PREVIEW;
+import static org.apache.activemq.broker.jmx.CompositeDataConstants.CONTENT_MAP;
+import static org.apache.activemq.broker.jmx.CompositeDataConstants.JMSXGROUP_ID;
+import static org.apache.activemq.broker.jmx.CompositeDataConstants.JMSXGROUP_SEQ;
+import static org.apache.activemq.broker.jmx.CompositeDataConstants.MESSAGE_TEXT;
+import static org.apache.activemq.broker.jmx.CompositeDataConstants.ORIGINAL_DESTINATION;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Date;
@@ -23,10 +30,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
-import javax.jms.Destination;
 import javax.management.openmbean.ArrayType;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
@@ -34,9 +39,9 @@
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.OpenType;
 import javax.management.openmbean.SimpleType;
-import javax.management.openmbean.TabularType;
 import javax.management.openmbean.TabularDataSupport;
-
+import javax.management.openmbean.TabularType;
+import org.apache.activemq.broker.scheduler.Job;
 import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.command.ActiveMQMapMessage;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -44,7 +49,6 @@
 import org.apache.activemq.command.ActiveMQStreamMessage;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.Message;
-import static org.apache.activemq.broker.jmx.CompositeDataConstants.*;
 
 public final class OpenTypeSupport {
 
@@ -54,14 +58,14 @@
         Map<String, Object> getFields(Object o) throws OpenDataException;
     }
 
-    private static final Map<Class, MessageOpenTypeFactory> OPEN_TYPE_FACTORIES = new HashMap<Class, MessageOpenTypeFactory>();
+    private static final Map<Class, AbstractOpenTypeFactory> OPEN_TYPE_FACTORIES = new HashMap<Class, AbstractOpenTypeFactory>();
 
     abstract static class AbstractOpenTypeFactory implements OpenTypeFactory {
 
         private CompositeType compositeType;
-        private List<String> itemNamesList = new ArrayList<String>();
-        private List<String> itemDescriptionsList = new ArrayList<String>();
-        private List<OpenType> itemTypesList = new ArrayList<OpenType>();
+        private final List<String> itemNamesList = new ArrayList<String>();
+        private final List<String> itemDescriptionsList = new ArrayList<String>();
+        private final List<OpenType> itemTypesList = new ArrayList<OpenType>();
 
         public CompositeType getCompositeType() throws OpenDataException {
             if (compositeType == null) {
@@ -109,10 +113,12 @@
         protected TabularType floatPropertyTabularType;
         protected TabularType doublePropertyTabularType;
 
+        @Override
         protected String getTypeName() {
             return ActiveMQMessage.class.getName();
         }
 
+        @Override
         protected void init() throws OpenDataException {
             super.init();
             addItem("JMSCorrelationID", "JMSCorrelationID", SimpleType.STRING);
@@ -150,6 +156,7 @@
             addItem(CompositeDataConstants.DOUBLE_PROPERTIES, "User Double Properties", doublePropertyTabularType);
         }
 
+        @Override
         public Map<String, Object> getFields(Object o) throws OpenDataException {
             ActiveMQMessage m = (ActiveMQMessage)o;
             Map<String, Object> rc = super.getFields(o);
@@ -255,16 +262,19 @@
     static class ByteMessageOpenTypeFactory extends MessageOpenTypeFactory {
 
 
+        @Override
         protected String getTypeName() {
             return ActiveMQBytesMessage.class.getName();
         }
 
+        @Override
         protected void init() throws OpenDataException {
             super.init();
             addItem(BODY_LENGTH, "Body length", SimpleType.LONG);
             addItem(BODY_PREVIEW, "Body preview", new ArrayType(1, SimpleType.BYTE));
         }
 
+        @Override
         public Map<String, Object> getFields(Object o) throws OpenDataException {
             ActiveMQBytesMessage m = (ActiveMQBytesMessage)o;
             Map<String, Object> rc = super.getFields(o);
@@ -298,15 +308,18 @@
 
     static class MapMessageOpenTypeFactory extends MessageOpenTypeFactory {
 
+        @Override
         protected String getTypeName() {
             return ActiveMQMapMessage.class.getName();
         }
 
+        @Override
         protected void init() throws OpenDataException {
             super.init();
             addItem(CONTENT_MAP, "Content map", SimpleType.STRING);
         }
 
+        @Override
         public Map<String, Object> getFields(Object o) throws OpenDataException {
             ActiveMQMapMessage m = (ActiveMQMapMessage)o;
             Map<String, Object> rc = super.getFields(o);
@@ -320,14 +333,17 @@
     }
 
     static class ObjectMessageOpenTypeFactory extends MessageOpenTypeFactory {
+        @Override
         protected String getTypeName() {
             return ActiveMQObjectMessage.class.getName();
         }
 
+        @Override
         protected void init() throws OpenDataException {
             super.init();
         }
 
+        @Override
         public Map<String, Object> getFields(Object o) throws OpenDataException {
             Map<String, Object> rc = super.getFields(o);
             return rc;
@@ -335,14 +351,17 @@
     }
 
     static class StreamMessageOpenTypeFactory extends MessageOpenTypeFactory {
+        @Override
         protected String getTypeName() {
             return ActiveMQStreamMessage.class.getName();
         }
 
+        @Override
         protected void init() throws OpenDataException {
             super.init();
         }
 
+        @Override
         public Map<String, Object> getFields(Object o) throws OpenDataException {
             Map<String, Object> rc = super.getFields(o);
             return rc;
@@ -351,15 +370,18 @@
 
     static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory {
 
+        @Override
         protected String getTypeName() {
             return ActiveMQTextMessage.class.getName();
         }
 
+        @Override
         protected void init() throws OpenDataException {
             super.init();
             addItem(MESSAGE_TEXT, MESSAGE_TEXT, SimpleType.STRING);
         }
 
+        @Override
         public Map<String, Object> getFields(Object o) throws OpenDataException {
             ActiveMQTextMessage m = (ActiveMQTextMessage)o;
             Map<String, Object> rc = super.getFields(o);
@@ -371,6 +393,41 @@
             return rc;
         }
     }
+    
+
+    static class JobOpenTypeFactory extends AbstractOpenTypeFactory {
+
+        @Override
+        protected String getTypeName() {
+            return Job.class.getName();
+        }
+
+        @Override
+        protected void init() throws OpenDataException {
+            super.init();
+            addItem("jobId", "jobId", SimpleType.STRING);
+            addItem("cronEntry", "Cron entry", SimpleType.STRING);
+            addItem("start", "start time", SimpleType.STRING);
+            addItem("next", "next time", SimpleType.STRING);
+            addItem("period", "period between jobs", SimpleType.LONG);
+            addItem("repeat", "number of times to repeat", SimpleType.INTEGER);
+        }
+
+        @Override
+        public Map<String, Object> getFields(Object o) throws OpenDataException {
+            Job job = (Job) o;
+            Map<String, Object> rc = super.getFields(o);
+            rc.put("jobId", job.getJobId());
+            rc.put("cronEntry", "" + job.getCronEntry());
+            rc.put("start", job.getStartTime());
+            rc.put("next", job.getNextExecutionTime());
+            rc.put("period", job.getPeriod());
+            rc.put("repeat", job.getRepeat());
+            return rc;
+        }
+    }
+
+
 
     static {
         OPEN_TYPE_FACTORIES.put(ActiveMQMessage.class, new MessageOpenTypeFactory());
@@ -379,12 +436,13 @@
         OPEN_TYPE_FACTORIES.put(ActiveMQObjectMessage.class, new ObjectMessageOpenTypeFactory());
         OPEN_TYPE_FACTORIES.put(ActiveMQStreamMessage.class, new StreamMessageOpenTypeFactory());
         OPEN_TYPE_FACTORIES.put(ActiveMQTextMessage.class, new TextMessageOpenTypeFactory());
+        OPEN_TYPE_FACTORIES.put(Job.class, new JobOpenTypeFactory());
     }
 
     private OpenTypeSupport() {
     }
     
-    public static OpenTypeFactory getFactory(Class<? extends Message> clazz) throws OpenDataException {
+    public static OpenTypeFactory getFactory(Class<?> clazz) throws OpenDataException {
         return OPEN_TYPE_FACTORIES.get(clazz);
     }
 

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java?rev=907197&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java Sat Feb  6 08:57:24 2010
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+
+public interface Job {
+
+    /**
+     * @return the jobId
+     */
+    public abstract String getJobId();
+
+    /**
+     * @return the repeat
+     */
+    public abstract int getRepeat();
+
+    /**
+     * @return the start
+     */
+    public abstract long getStart();
+
+    /**
+     * @return the period
+     */
+    public abstract long getPeriod();
+    
+    /**
+     * @return the cron entry
+     */
+    public abstract String getCronEntry();
+
+    /**
+     * @return the payload
+     */
+    public abstract byte[] getPayload();
+    
+    /**
+     * Get the start time as a Date time string
+     * @return the date time
+     */
+    public String getStartTime();
+    
+    /**
+     * Get the time the job is next due to execute 
+     * @return the date time
+     */
+    public String getNextExecutionTime();
+
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/Job.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java?rev=907197&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java Sat Feb  6 08:57:24 2010
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import org.apache.kahadb.util.ByteSequence;
+
+
+public class JobImpl implements Job {
+    private final JobLocation location;
+    private final byte[] payload;
+    
+    protected JobImpl(JobLocation location,ByteSequence bs) {
+        this.location=location;
+        this.payload = new byte[bs.getLength()];
+        System.arraycopy(bs.getData(), bs.getOffset(), this.payload, 0, bs.getLength());
+    }
+
+    public String getJobId() {
+        return this.location.getJobId();
+    }
+
+    public byte[] getPayload() {
+       return this.payload;
+    }
+
+    public long getPeriod() {
+       return this.location.getPeriod();
+    }
+
+    public int getRepeat() {
+       return this.location.getRepeat();
+    }
+
+    public long getStart() {
+       return this.location.getStart();
+    }
+
+    public String getCronEntry() {
+        return this.location.getCronEntry();
+    }
+    
+    
+
+    public String getNextExecutionTime() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public String getStartTime() {
+        return JobImpl.getDateTime(getStart());
+    }
+    
+   public static long getDataTime(String value) throws Exception {
+        DateFormat dfm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+     
+        Date date = dfm.parse(value);
+        return date.getTime();
+    }
+    
+    public static String getDateTime(long value) {
+        DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        Date date = new Date(value);
+        return dateFormat.format(date);
+    }
+
+    
+    
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java?rev=907197&r1=907196&r2=907197&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java Sat Feb  6 08:57:24 2010
@@ -30,6 +30,7 @@
     private int repeat;
     private long start;
     private long period;
+    private String cronEntry;
     private final Location location;
 
     public JobLocation(Location location) {
@@ -46,14 +47,19 @@
         this.repeat = in.readInt();
         this.start = in.readLong();
         this.period = in.readLong();
+        this.cronEntry=in.readUTF();
         this.location.readExternal(in);
     }
 
     public void writeExternal(DataOutput out) throws IOException {
         out.writeUTF(this.jobId);
-        out.writeInt(repeat);
-        out.writeLong(start);
-        out.writeLong(period);
+        out.writeInt(this.repeat);
+        out.writeLong(this.start);
+        out.writeLong(this.period);
+        if (this.cronEntry==null) {
+            this.cronEntry="";
+        }
+        out.writeUTF(this.cronEntry);
         this.location.writeExternal(out);
     }
 
@@ -117,6 +123,20 @@
     public void setPeriod(long period) {
         this.period = period;
     }
+    
+    /**
+     * @return the cronEntry
+     */
+    public synchronized String getCronEntry() {
+        return this.cronEntry;
+    }
+
+    /**
+     * @param cronEntry the cronEntry to set
+     */
+    public synchronized void setCronEntry(String cronEntry) {
+        this.cronEntry = cronEntry;
+    }
 
     /**
      * @return the location

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java?rev=907197&r1=907196&r2=907197&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java Sat Feb  6 08:57:24 2010
@@ -16,35 +16,37 @@
  */
 package org.apache.activemq.broker.scheduler;
 
-import java.io.IOException;
 import java.util.List;
 import org.apache.kahadb.util.ByteSequence;
 
-interface JobScheduler {
+public interface JobScheduler {
 
     /**
      * @return the name of the scheduler
+     * @throws Exception 
      */
-    public abstract String getName();
+    public abstract String getName() throws Exception;
 /**
  * Add a Job listener
  * @param l
+ * @throws Exception 
  */
-    public abstract void addListener(JobListener l);
+    public abstract void addListener(JobListener l) throws Exception;
 /**
  * remove a JobListener
  * @param l
+ * @throws Exception 
  */
-    public abstract void removeListener(JobListener l);
+    public abstract void removeListener(JobListener l) throws Exception;
 
     /**
      * Add a job to be scheduled
      * @param jobId a unique identifier for the job
      * @param payload the message to be sent when the job is scheduled
      * @param delay the time in milliseconds before the job will be run
-     * @throws IOException
+     * @throws Exception
      */
-    public abstract void schedule(String jobId, ByteSequence payload,long delay) throws IOException;
+    public abstract void schedule(String jobId, ByteSequence payload,long delay) throws Exception;
 
     
     /**
@@ -54,36 +56,68 @@
      * @param start 
      * @param period the time in milliseconds between successive executions of the Job
      * @param repeat the number of times to execute the job - less than 0 will be repeated forever
-     * @throws IOException
+     * @throws Exception
      */
-    public abstract void schedule(String jobId, ByteSequence payload,long start, long period, int repeat) throws IOException;
+    public abstract void schedule(String jobId, ByteSequence payload,long start, long period, int repeat) throws Exception;
 
     /**
      * remove all jobs scheduled to run at this time
      * @param time
-     * @throws IOException
+     * @throws Exception 
      */
-    public abstract void remove(long time) throws IOException;
+    public abstract void remove(long time) throws  Exception;
 
     /**
      * remove a job with the matching jobId
      * @param jobId
-     * @throws IOException
+     * @throws Exception 
      */
-    public abstract void remove(String jobId) throws IOException;
-
+    public abstract void remove(String jobId) throws  Exception;
+    
     /**
-     * Get all the jobs scheduled to run next
-     * @return a list of messages that will be scheduled next
-     * @throws IOException
+     * remove all the Jobs from the scheduler
+     * @throws Exception
      */
-    public abstract List<ByteSequence> getNextScheduleJobs() throws IOException;
+    public abstract void removeAllJobs() throws Exception;
+    
+    /**
+     * remove all the Jobs from the scheduler that are due between the start and finish times
+     * @param start time in milliseconds
+     * @param finish time in milliseconds
+     * @throws Exception
+     */
+    public abstract void removeAllJobs(long start,long finish) throws Exception;
+    
+
     
     /**
      * Get the next time jobs will be fired
      * @return the time in milliseconds
-     * @throws IOException 
+     * @throws Exception 
+     */
+    public abstract long getNextScheduleTime() throws Exception;
+    
+    /**
+     * Get all the jobs scheduled to run next
+     * @return a list of jobs that will be scheduled next
+     * @throws Exception
+     */
+    public abstract List<Job> getNextScheduleJobs() throws Exception;
+    
+    /**
+     * Get all the outstanding Jobs
+     * @return a  list of all jobs
+     * @throws Exception 
+     */
+    public abstract List<Job> getAllJobs() throws Exception;
+    
+    /**
+     * Get all outstanding jobs due to run between start and finish
+     * @param start
+     * @param finish
+     * @return a list of jobs
+     * @throws Exception
      */
-    public abstract long getNextScheduleTime() throws IOException;
+    public abstract List<Job> getAllJobs(long start,long finish)throws Exception;
 
 }
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java?rev=907197&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java Sat Feb  6 08:57:24 2010
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.kahadb.util.ByteSequence;
+
+public class JobSchedulerFacade implements JobScheduler {
+
+    private final SchedulerBroker broker;
+    
+    JobSchedulerFacade(SchedulerBroker broker){
+        this.broker=broker;
+    }
+    public void addListener(JobListener l) throws Exception {
+        JobScheduler js = this.broker.getInternalScheduler();
+        if (js !=null) {
+            js.addListener(l);
+        }
+    }
+
+    public List<Job> getAllJobs() throws Exception {
+        JobScheduler js = this.broker.getInternalScheduler();
+        if (js !=null) {
+            return js.getAllJobs();
+        }
+        return Collections.emptyList();
+    }
+
+    public List<Job> getAllJobs(long start, long finish) throws Exception {
+        JobScheduler js = this.broker.getInternalScheduler();
+        if (js !=null) {
+            return js.getAllJobs(start,finish);
+        }
+        return Collections.emptyList();
+    }
+
+    public String getName() throws Exception {
+        JobScheduler js = this.broker.getInternalScheduler();
+        if (js !=null) {
+            return js.getName();
+        }
+        return "";
+    }
+
+    public List<Job> getNextScheduleJobs() throws Exception {
+        JobScheduler js = this.broker.getInternalScheduler();
+        if (js !=null) {
+            return js.getNextScheduleJobs();
+        }
+        return Collections.emptyList();
+    }
+
+    public long getNextScheduleTime() throws Exception {
+        JobScheduler js = this.broker.getInternalScheduler();
+        if (js !=null) {
+            return js.getNextScheduleTime();
+        }
+        return 0;
+    }
+
+    public void remove(long time) throws Exception {
+        JobScheduler js = this.broker.getInternalScheduler();
+        if (js !=null) {
+            js.remove(time);
+        }
+    }
+
+    public void remove(String jobId) throws Exception {
+        JobScheduler js = this.broker.getInternalScheduler();
+        if (js !=null) {
+            js.remove(jobId);
+        }
+
+    }
+
+    public void removeAllJobs() throws Exception {
+        JobScheduler js = this.broker.getInternalScheduler();
+        if (js !=null) {
+            js.removeAllJobs();
+        }
+    }
+
+    public void removeAllJobs(long start, long finish) throws Exception {
+        JobScheduler js = this.broker.getInternalScheduler();
+        if (js !=null) {
+            js.removeAllJobs(start,finish);
+        }
+
+    }
+
+    public void removeListener(JobListener l) throws Exception {
+        JobScheduler js = this.broker.getInternalScheduler();
+        if (js !=null) {
+            js.removeListener(l);
+        }
+
+    }
+
+    public void schedule(String jobId, ByteSequence payload, long delay) throws Exception {
+        JobScheduler js = this.broker.getInternalScheduler();
+        if (js !=null) {
+            js.schedule(jobId, payload, delay);
+        }
+    }
+
+    public void schedule(String jobId, ByteSequence payload, long start, long period, int repeat) throws Exception {
+        JobScheduler js = this.broker.getInternalScheduler();
+        if (js !=null) {
+            js.schedule(jobId, payload, start,period,repeat);
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java?rev=907197&r1=907196&r2=907197&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java Sat Feb  6 08:57:24 2010
@@ -82,7 +82,6 @@
         this.jobListeners.remove(l);
     }
 
-   
     public void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException {
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
             public void execute(Transaction tx) throws IOException {
@@ -91,14 +90,14 @@
         });
     }
 
-   
-    public void schedule(final String jobId, final ByteSequence payload, final long start, final long period, final int repeat) throws IOException {
+    public void schedule(final String jobId, final ByteSequence payload, final long start, final long period,
+            final int repeat) throws IOException {
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
             public void execute(Transaction tx) throws IOException {
                 schedule(tx, jobId, payload, start, period, repeat);
             }
         });
-       
+
     }
 
     /*
@@ -147,8 +146,8 @@
      * (non-Javadoc)
      * @see org.apache.activemq.beanstalk.JobScheduler#getNextScheduleJobs()
      */
-    public synchronized List<ByteSequence> getNextScheduleJobs() throws IOException {
-        final List<ByteSequence> result = new ArrayList<ByteSequence>();
+    public synchronized List<Job> getNextScheduleJobs() throws IOException {
+        final List<Job> result = new ArrayList<Job>();
 
         this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
             public void execute(Transaction tx) throws IOException {
@@ -156,26 +155,90 @@
                 if (first != null) {
                     for (JobLocation jl : first.getValue()) {
                         ByteSequence bs = getJob(jl.getLocation());
-                        result.add(bs);
+                        Job job = new JobImpl(jl, bs);
+                        result.add(job);
+                    }
+                }
+            }
+        });
+        return result;
+    }
+
+    public synchronized List<Job> getAllJobs() throws IOException {
+        final List<Job> result = new ArrayList<Job>();
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx());
+                while (iter.hasNext()) {
+                    Map.Entry<Long, List<JobLocation>> next = iter.next();
+                    if (next != null) {
+                        for (JobLocation jl : next.getValue()) {
+                            ByteSequence bs = getJob(jl.getLocation());
+                            Job job = new JobImpl(jl, bs);
+                            result.add(job);
+                        }
+                    } else {
+                        break;
+                    }
+                }
+
+            }
+        });
+        return result;
+    }
+
+    public synchronized List<Job> getAllJobs(final long start, final long finish) throws IOException {
+        final List<Job> result = new ArrayList<Job>();
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx(), start);
+                while (iter.hasNext()) {
+                    Map.Entry<Long, List<JobLocation>> next = iter.next();
+                    if (next != null && next.getKey().longValue() <= finish) {
+                        for (JobLocation jl : next.getValue()) {
+                            ByteSequence bs = getJob(jl.getLocation());
+                            Job job = new JobImpl(jl, bs);
+                            result.add(job);
+                        }
+                    } else {
+                        break;
                     }
                 }
+
             }
         });
         return result;
     }
 
+    public synchronized void removeAllJobs() throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                destroy(tx);
+            }
+        });
+    }
+
+    public synchronized void removeAllJobs(final long start, final long finish) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                destroy(tx,start,finish);
+            }
+        });
+
+    }
+
     ByteSequence getJob(Location location) throws IllegalStateException, IOException {
         return this.store.getJob(location);
     }
 
-    void schedule(Transaction tx,  String jobId, ByteSequence payload,long start, long period, int repeat)
+    void schedule(Transaction tx, String jobId, ByteSequence payload, long start, long period, int repeat)
             throws IOException {
         List<JobLocation> values = null;
         long startTime;
         long time;
         if (start > 0) {
             time = startTime = start;
-        }else {
+        } else {
             startTime = System.currentTimeMillis();
             time = startTime + period;
         }
@@ -239,15 +302,40 @@
     }
 
     synchronized void destroy(Transaction tx) throws IOException {
+        List<Long> keys = new ArrayList<Long>();
         for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
             Map.Entry<Long, List<JobLocation>> entry = i.next();
+            keys.add(entry.getKey());
             List<JobLocation> values = entry.getValue();
             if (values != null) {
                 for (JobLocation jl : values) {
                     this.store.decrementJournalCount(tx, jl.getLocation());
                 }
             }
+        }
+        for (Long l : keys) {
+            this.index.remove(tx, l);
+        }
+    }
 
+    synchronized void destroy(Transaction tx, long start, long finish) throws IOException {
+        List<Long> keys = new ArrayList<Long>();
+        for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx, start); i.hasNext();) {
+            Map.Entry<Long, List<JobLocation>> entry = i.next();
+            if (entry.getKey().longValue() <= finish) {
+                keys.add(entry.getKey());
+                List<JobLocation> values = entry.getValue();
+                if (values != null) {
+                    for (JobLocation jl : values) {
+                        this.store.decrementJournalCount(tx, jl.getLocation());
+                    }
+                }
+            } else {
+                break;
+            }
+        }
+        for (Long l : keys) {
+            this.index.remove(tx, l);
         }
     }
 
@@ -311,7 +399,7 @@
                                 ByteSequence payload = this.store.getJob(jl.getLocation());
                                 String jobId = jl.getJobId();
                                 long period = jl.getPeriod();
-                                schedule(jobId, payload,0, period, repeat);
+                                schedule(jobId, payload, 0, period, repeat);
                             }
                         }
                         // now remove jobs from this execution time
@@ -408,7 +496,4 @@
             }
         }
     }
-
- 
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=907197&r1=907196&r2=907197&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java Sat Feb  6 08:57:24 2010
@@ -60,6 +60,11 @@
         LOG.info("Scheduler using directory: " + directory);
 
     }
+   
+    public synchronized  JobScheduler getJobScheduler() throws Exception {
+        return new JobSchedulerFacade(this);
+    }
+   
     /**
      * @return the directory
      */
@@ -114,7 +119,7 @@
                 repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
             }
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(messageSend);
-            getScheduler().schedule( messageSend.getMessageId().toString(),
+            getInternalScheduler().schedule( messageSend.getMessageId().toString(),
                     new ByteSequence(packet.data, packet.offset, packet.length),start, period, repeat);
 
         } else {
@@ -137,12 +142,15 @@
                     messageSend.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
                 }
             }   
+            //Add the jobId as a property
+            messageSend.setProperty("scheduledJobId", id);
            
             //if this goes across a network - we don't want it rescheduled
             messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
             messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_START);
             messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
             
+            
             final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
             producerExchange.setConnectionContext(context);
             producerExchange.setMutable(true);
@@ -153,11 +161,11 @@
         }
 
     }
-
-    private JobScheduler getScheduler() throws Exception {
+    
+    protected synchronized  JobScheduler getInternalScheduler() throws Exception {
         if (this.started.get()) {
             if (this.scheduler == null) {
-                this.scheduler = getStore().getJobScheduler("ActiveMQ");
+                this.scheduler = getStore().getJobScheduler("JMS");
                 this.scheduler.addListener(this);
             }
             return this.scheduler;
@@ -165,6 +173,8 @@
         return null;
     }
 
+    
+
     private JobSchedulerStore getStore() throws Exception {
         if (started.get()) {
             if (this.store == null) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java?rev=907197&r1=907196&r2=907197&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java Sat Feb  6 08:57:24 2010
@@ -44,7 +44,7 @@
 		for (ByteSequence job:list) {
 		    js.schedule("id:"+(count++), job,startTime,10000,-1);	    
 		}
-		List<ByteSequence>test = js.getNextScheduleJobs();
+		List<Job>test = js.getNextScheduleJobs();
 		assertEquals(list.size(),test.size());
 		store.stop();
 		
@@ -54,7 +54,7 @@
 		assertEquals(list.size(),test.size());
 		for (int i = 0; i < list.size();i++) {
 		    String orig = new String(list.get(i).getData());
-		    String payload = new String(test.get(i).getData());
+		    String payload = new String(test.get(i).getPayload());
 		    assertEquals(orig,payload);
 		}
 	}

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java?rev=907197&r1=907196&r2=907197&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java Sat Feb  6 08:57:24 2010
@@ -19,7 +19,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import java.io.File;
-import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.activemq.util.IOHelper;
@@ -116,7 +116,7 @@
     }
 
     @Test
-    public void testRemoveString() throws IOException {
+    public void testRemoveString() throws Exception {
         final int COUNT = 10;
         final String test = "TESTREMOVE";
         long time = System.currentTimeMillis() + 20000;
@@ -135,6 +135,82 @@
         assertEquals(size, COUNT);
     }
 
+    @Test
+    public void testgetAllJobs() throws Exception {
+        final int COUNT = 10;
+        final String ID = "id:";
+        final String test = "TEST";
+        long time = System.currentTimeMillis() + 20000;
+        for (int i = 0; i < COUNT; i++) {
+            String str = new String("test" + i);
+            scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), time, 10 + i, -1);
+        }
+        List<Job> list = scheduler.getAllJobs();
+
+        assertEquals(list.size(), COUNT);
+        int count = 0;
+        for (Job job : list) {
+
+            assertEquals(job.getJobId(), ID + count);
+            count++;
+        }
+    }
+
+    @Test
+    public void testgetAllJobsInRange() throws Exception {
+        final int COUNT = 10;
+        final String ID = "id:";
+        final String test = "TEST";
+        long start = System.currentTimeMillis() + 10000;
+
+        long time = System.currentTimeMillis() + 20000;
+        for (int i = 0; i < COUNT; i++) {
+            String str = new String("test" + i);
+            if (i < (COUNT - 2)) {
+                scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), start + (i * 1000), 10000 + i, 0);
+            } else {
+                scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), start + start, 10000 + i, 0);
+            }
+        }
+        long finish = start + ((COUNT - 2) * 1000);
+        List<Job> list = scheduler.getAllJobs(start, finish);
+
+        assertEquals(list.size(), COUNT - 2);
+        int count = 0;
+        for (Job job : list) {
+
+            assertEquals(job.getJobId(), ID + count);
+            count++;
+        }
+    }
+
+    @Test
+    public void testRemoveAllJobsInRange() throws Exception {
+        final int COUNT = 10;
+        final String ID = "id:";
+        final String test = "TEST";
+        long start = System.currentTimeMillis() + 10000;
+
+        long time = System.currentTimeMillis() + 20000;
+        for (int i = 0; i < COUNT; i++) {
+            String str = new String("test" + i);
+            if (i < (COUNT - 2)) {
+                scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), start + (i * 1000), 10000 + i, 0);
+            } else {
+                scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), start + start, 10000 + i, 0);
+            }
+        }
+        long finish = start + ((COUNT - 2) * 1000);
+        scheduler.removeAllJobs(start, finish);
+        List<Job> list = scheduler.getAllJobs();
+        assertEquals(list.size(), 2);
+        int count = COUNT - 2;
+        for (Job job : list) {
+            assertEquals(job.getJobId(), ID + count);
+            count++;
+        }
+    }
+
     @Before
     public void setUp() throws Exception {
         File directory = new File("target/test/ScheduledDB");



Mime
View raw message