activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [5/5] git commit: https://issues.apache.org/jira/browse/AMQ-3758
Date Mon, 07 Jul 2014 16:43:58 GMT
https://issues.apache.org/jira/browse/AMQ-3758

Refactor the scheduler store into a more KahaDB style store that can
recover from various problems like missing journal files or corruption
as well as rebuild its index when needed.  Move the scheduler store into
a more configurable style that allows for users to plug in their own
implementations.  Store update from legacy versions is automatic.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/74846bb2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/74846bb2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/74846bb2

Branch: refs/heads/trunk
Commit: 74846bb2b40e6183f2accdcab388c2d309c1eaad
Parents: aa79c7e
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Jul 7 12:28:11 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Jul 7 12:28:11 2014 -0400

----------------------------------------------------------------------
 .../apache/activemq/broker/BrokerService.java   |   25 +-
 .../activemq/broker/jmx/JobSchedulerView.java   |   56 +-
 .../broker/jmx/JobSchedulerViewMBean.java       |  113 +-
 .../apache/activemq/broker/scheduler/Job.java   |   23 +-
 .../activemq/broker/scheduler/JobListener.java  |   16 +-
 .../activemq/broker/scheduler/JobScheduler.java |   33 +-
 .../broker/scheduler/JobSchedulerFacade.java    |    6 +
 .../broker/scheduler/JobSchedulerStore.java     |   43 +
 .../activemq/broker/scheduler/JobSupport.java   |    5 +-
 .../activemq/store/PersistenceAdapter.java      |  119 +-
 .../store/memory/MemoryPersistenceAdapter.java  |   36 +-
 .../java/org/apache/activemq/util/IOHelper.java |   68 +-
 .../store/jdbc/JDBCPersistenceAdapter.java      |    7 +
 .../journal/JournalPersistenceAdapter.java      |   71 +-
 .../store/kahadb/AbstractKahaDBMetaData.java    |   57 +
 .../store/kahadb/AbstractKahaDBStore.java       |  745 ++++++++++++
 .../activemq/store/kahadb/KahaDBMetaData.java   |  135 +++
 .../store/kahadb/KahaDBPersistenceAdapter.java  |   15 +-
 .../activemq/store/kahadb/KahaDBStore.java      |   55 +-
 .../kahadb/MultiKahaDBPersistenceAdapter.java   |   56 +-
 .../kahadb/MultiKahaDBTransactionStore.java     |   18 +-
 .../activemq/store/kahadb/TempKahaDBStore.java  |  138 ++-
 .../apache/activemq/store/kahadb/Visitor.java   |   20 +
 .../store/kahadb/scheduler/JobImpl.java         |   21 +-
 .../store/kahadb/scheduler/JobLocation.java     |   77 +-
 .../scheduler/JobLocationsMarshaller.java       |   53 +
 .../kahadb/scheduler/JobSchedulerImpl.java      |  837 ++++++++------
 .../scheduler/JobSchedulerKahaDBMetaData.java   |  246 ++++
 .../kahadb/scheduler/JobSchedulerStoreImpl.java | 1076 +++++++++++++-----
 .../scheduler/UnknownStoreVersionException.java |   24 +
 .../kahadb/scheduler/legacy/LegacyJobImpl.java  |   72 ++
 .../scheduler/legacy/LegacyJobLocation.java     |  296 +++++
 .../legacy/LegacyJobSchedulerImpl.java          |  222 ++++
 .../legacy/LegacyJobSchedulerStoreImpl.java     |  378 ++++++
 .../scheduler/legacy/LegacyStoreReplayer.java   |  155 +++
 .../src/main/proto/journal-data.proto           |   61 +
 .../apache/activemq/leveldb/LevelDBStore.scala  |    5 +
 .../leveldb/replicated/ProxyLevelDBStore.scala  |    5 +
 .../JobSchedulerBrokerShutdownTest.java         |    1 +
 .../JobSchedulerJmxManagementTests.java         |  155 +++
 .../scheduler/JobSchedulerManagementTest.java   |   84 +-
 .../JobSchedulerStoreCheckpointTest.java        |  125 ++
 .../broker/scheduler/JobSchedulerStoreTest.java |   46 +-
 .../broker/scheduler/JobSchedulerTest.java      |   36 +
 .../scheduler/JobSchedulerTestSupport.java      |  112 ++
 .../KahaDBSchedulerIndexRebuildTest.java        |  179 +++
 .../KahaDBSchedulerMissingJournalLogsTest.java  |  204 ++++
 .../scheduler/SchedulerDBVersionTest.java       |  164 +++
 .../src/test/resources/log4j.properties         |    1 +
 .../activemq/store/schedulerDB/legacy/db-1.log  |  Bin 0 -> 524288 bytes
 .../store/schedulerDB/legacy/scheduleDB.data    |  Bin 0 -> 20480 bytes
 .../store/schedulerDB/legacy/scheduleDB.redo    |  Bin 0 -> 16408 bytes
 52 files changed, 5584 insertions(+), 911 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 044af08..1ff4a21 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -1868,6 +1868,23 @@ public class BrokerService implements Service {
 
             try {
                 PersistenceAdapter pa = getPersistenceAdapter();
+                if (pa != null) {
+                    this.jobSchedulerStore = pa.createJobSchedulerStore();
+                    jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
+                    configureService(jobSchedulerStore);
+                    jobSchedulerStore.start();
+                    return this.jobSchedulerStore;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            } catch (UnsupportedOperationException ex) {
+                // It's ok if the store doesn't implement a scheduler.
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+
+            try {
+                PersistenceAdapter pa = getPersistenceAdapter();
                 if (pa != null && pa instanceof JobSchedulerStore) {
                     this.jobSchedulerStore = (JobSchedulerStore) pa;
                     configureService(jobSchedulerStore);
@@ -1877,9 +1894,13 @@ public class BrokerService implements Service {
                 throw new RuntimeException(e);
             }
 
+            // Load the KahaDB store as a last resort, this only works if KahaDB is
+            // included at runtime, otherwise this will fail.  User should disable
+            // scheduler support if this fails.
             try {
-                String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl";
-                jobSchedulerStore = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance();
+                String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
+                PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
+                jobSchedulerStore = adaptor.createJobSchedulerStore();
                 jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
                 configureService(jobSchedulerStore);
                 jobSchedulerStore.start();

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
index 9e5a1fb..2118a96 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
@@ -16,23 +16,39 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import java.util.List;
+
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
 import org.apache.activemq.broker.scheduler.Job;
 import org.apache.activemq.broker.scheduler.JobScheduler;
 import org.apache.activemq.broker.scheduler.JobSupport;
 
-import javax.management.openmbean.*;
-import java.io.IOException;
-import java.util.List;
-
+/**
+ * MBean object that can be used to manage a single instance of a JobScheduler.  The object
+ * provides methods for querying for jobs and removing some or all of the jobs that are
+ * scheduled in the managed store.
+ */
 public class JobSchedulerView implements JobSchedulerViewMBean {
 
     private final JobScheduler jobScheduler;
 
+    /**
+     * Creates a new instance of the JobScheduler management MBean.
+     *
+     * @param jobScheduler
+     *        The scheduler instance to manage.
+     */
     public JobSchedulerView(JobScheduler jobScheduler) {
         this.jobScheduler = jobScheduler;
     }
 
+    @Override
     public TabularData getAllJobs() throws Exception {
         OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
         CompositeType ct = factory.getCompositeType();
@@ -45,6 +61,7 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
         return rc;
     }
 
+    @Override
     public TabularData getAllJobs(String startTime, String finishTime) throws Exception {
         OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
         CompositeType ct = factory.getCompositeType();
@@ -59,6 +76,7 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
         return rc;
     }
 
+    @Override
     public TabularData getNextScheduleJobs() throws Exception {
         OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
         CompositeType ct = factory.getCompositeType();
@@ -71,31 +89,51 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
         return rc;
     }
 
+    @Override
     public String getNextScheduleTime() throws Exception {
         long time = this.jobScheduler.getNextScheduleTime();
         return JobSupport.getDateTime(time);
     }
 
+    @Override
     public void removeAllJobs() throws Exception {
         this.jobScheduler.removeAllJobs();
-
     }
 
+    @Override
     public void removeAllJobs(String startTime, String finishTime) throws Exception {
         long start = JobSupport.getDataTime(startTime);
         long finish = JobSupport.getDataTime(finishTime);
         this.jobScheduler.removeAllJobs(start, finish);
+    }
 
+    @Override
+    public void removeAllJobsAtScheduledTime(String time) throws Exception {
+        long removeAtTime = JobSupport.getDataTime(time);
+        this.jobScheduler.remove(removeAtTime);
     }
 
+    @Override
+    public void removeJobAtScheduledTime(String time) throws Exception {
+        removeAllJobsAtScheduledTime(time);
+    }
+
+    @Override
     public void removeJob(String jobId) throws Exception {
         this.jobScheduler.remove(jobId);
-
     }
 
-    public void removeJobAtScheduledTime(String time) throws IOException {
-        // TODO Auto-generated method stub
+    @Override
+    public int getExecutionCount(String jobId) throws Exception {
+        int result = 0;
 
-    }
+        List<Job> jobs = this.jobScheduler.getAllJobs();
+        for (Job job : jobs) {
+            if (job.getJobId().equals(jobId)) {
+                result = job.getExecutionCount();
+            }
+        }
 
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
index f5745ea..76a7926 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
@@ -18,76 +18,125 @@ package org.apache.activemq.broker.jmx;
 
 import javax.management.openmbean.TabularData;
 
-
-
 public interface JobSchedulerViewMBean {
+
     /**
-     * remove all jobs scheduled to run at this time
+     * Remove all jobs scheduled to run at this time.  If there are no jobs scheduled
+     * at the given time this methods returns without making any modifications to the
+     * scheduler store.
+     *
      * @param time
-     * @throws Exception
+     *        the string formated time that should be used to remove jobs.
+     *
+     * @throws Exception if an error occurs while performing the remove.
+     *
+     * @deprecated use removeAllJobsAtScheduledTime instead as it is more explicit about what
+     *             the method is actually doing.
      */
+    @Deprecated
     @MBeanInfo("remove jobs with matching execution time")
     public abstract void removeJobAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd hh:mm:ss")String time) throws Exception;
 
     /**
-     * remove a job with the matching jobId
+     * Remove all jobs scheduled to run at this time.  If there are no jobs scheduled
+     * at the given time this methods returns without making any modifications to the
+     * scheduler store.
+     *
+     * @param time
+     *        the string formated time that should be used to remove jobs.
+     *
+     * @throws Exception if an error occurs while performing the remove.
+     */
+    @MBeanInfo("remove jobs with matching execution time")
+    public abstract void removeAllJobsAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd hh:mm:ss")String time) throws Exception;
+
+    /**
+     * Remove a job with the matching jobId.  If the method does not find a matching job
+     * then it returns without throwing an error or making any modifications to the job
+     * scheduler store.
+     *
      * @param jobId
-     * @throws Exception
+     *        the Job Id to remove from the scheduler store.
+     *
+     * @throws Exception if an error occurs while attempting to remove the Job.
      */
     @MBeanInfo("remove jobs with matching jobId")
     public abstract void removeJob(@MBeanInfo("jobId")String jobId) throws Exception;
-    
+
     /**
-     * remove all the Jobs from the scheduler
-     * @throws Exception
+     * Remove all the Jobs from the scheduler,
+     *
+     * @throws Exception if an error occurs while purging the store.
      */
     @MBeanInfo("remove all scheduled jobs")
     public abstract void removeAllJobs() throws Exception;
-    
+
     /**
-     * remove all the Jobs from the scheduler that are due between the start and finish times
-     * @param start time 
-     * @param finish time
-     * @throws Exception
+     * Remove all the Jobs from the scheduler that are due between the start and finish times.
+     *
+     * @param start
+     *        the starting time to remove jobs from.
+     * @param finish
+     *        the finish time for the remove operation.
+     *
+     * @throws Exception if an error occurs while attempting to remove the jobs.
      */
     @MBeanInfo("remove all scheduled jobs between time ranges ")
     public abstract void removeAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish) throws Exception;
-    
 
-    
     /**
-     * Get the next time jobs will be fired
-     * @return the time in milliseconds
-     * @throws Exception 
+     * Get the next time jobs will be fired from this scheduler store.
+     *
+     * @return the time in milliseconds of the next job to execute.
+     *
+     * @throws Exception if an error occurs while accessing the store.
      */
     @MBeanInfo("get the next time a job is due to be scheduled ")
     public abstract String getNextScheduleTime() throws Exception;
-    
+
+    /**
+     * Gets the number of times a scheduled Job has been executed.
+     *
+     * @return the total number of time a scheduled job has executed.
+     *
+     * @throws Exception if an error occurs while querying for the Job.
+     */
+    @MBeanInfo("get the next time a job is due to be scheduled ")
+    public abstract int getExecutionCount(@MBeanInfo("jobId")String jobId) throws Exception;
+
     /**
-     * Get all the jobs scheduled to run next
+     * Get all the jobs scheduled to run next.
+     *
      * @return a list of jobs that will be scheduled next
-     * @throws Exception
+     *
+     * @throws Exception if an error occurs while reading the scheduler store.
      */
     @MBeanInfo("get the next job(s) to be scheduled. Not HTML friendly ")
     public abstract TabularData getNextScheduleJobs() throws Exception;
-    
-    /**
-     * Get all the outstanding Jobs
-     * @return a  table of all jobs
-     * @throws Exception
 
+    /**
+     * Get all the outstanding Jobs that are scheduled in this scheduler store.
+     *
+     * @return a table of all jobs in this scheduler store.
+     *
+     * @throws Exception if an error occurs while reading the store.
      */
     @MBeanInfo("get the scheduled Jobs in the Store. Not HTML friendly ")
     public abstract TabularData getAllJobs() throws Exception;
-    
+
     /**
-     * Get all outstanding jobs due to run between start and finish
+     * Get all outstanding jobs due to run between start and finish time range.
+     *
      * @param start
+     *        the starting time range to query the store for jobs.
      * @param finish
-     * @return a table of jobs in the range
-     * @throws Exception
-
+     *        the ending time of this query for scheduled jobs.
+     *
+     * @return a table of jobs in the range given.
+     *
+     * @throws Exception if an error occurs while querying the scheduler store.
      */
     @MBeanInfo("get the scheduled Jobs in the Store within the time range. Not HTML friendly ")
     public abstract TabularData getAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish)throws Exception;
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
index 7b28a5b..047fe23 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
@@ -16,7 +16,12 @@
  */
 package org.apache.activemq.broker.scheduler;
 
-
+/**
+ * Interface for a scheduled Job object.
+ *
+ * Each Job is identified by a unique Job Id which can be used to reference the Job
+ * in the Job Scheduler store for updates or removal.
+ */
 public interface Job {
 
     /**
@@ -38,11 +43,12 @@ public interface Job {
      * @return the Delay
      */
     public abstract long getDelay();
+
     /**
      * @return the period
      */
     public abstract long getPeriod();
-    
+
     /**
      * @return the cron entry
      */
@@ -52,17 +58,24 @@ public interface Job {
      * @return the payload
      */
     public abstract byte[] getPayload();
-    
+
     /**
      * Get the start time as a Date time string
      * @return the date time
      */
     public String getStartTime();
-    
+
     /**
-     * Get the time the job is next due to execute 
+     * Get the time the job is next due to execute
      * @return the date time
      */
     public String getNextExecutionTime();
 
+    /**
+     * Gets the total number of times this job has executed.
+     *
+     * @returns the number of times this job has been executed.
+     */
+    public int getExecutionCount();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
index c53d9c6..a453595 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
@@ -18,13 +18,21 @@ package org.apache.activemq.broker.scheduler;
 
 import org.apache.activemq.util.ByteSequence;
 
+/**
+ * Job event listener interface. Provides event points for Job related events
+ * such as job ready events.
+ */
 public interface JobListener {
-    
+
     /**
-     * A Job that has been scheduled is now ready 
-     * @param id
+     * A Job that has been scheduled is now ready to be fired.  The Job is passed
+     * in its raw byte form and must be un-marshaled before being delivered.
+     *
+     * @param jobId
+     *        The unique Job Id of the Job that is ready to fire.
      * @param job
+     *        The job that is now ready, delivered in byte form.
      */
-    public void scheduledJob(String id,ByteSequence job);
+    public void scheduledJob(String id, ByteSequence job);
 
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
index 2e96eae..e951861 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
@@ -46,20 +46,25 @@ public interface JobScheduler {
     void stopDispatching() throws Exception;
 
     /**
-     * Add a Job listener
+     * Add a Job listener which will receive events related to scheduled jobs.
+     *
+     * @param listener
+     *      The job listener to add.
      *
-     * @param l
      * @throws Exception
      */
-    void addListener(JobListener l) throws Exception;
+    void addListener(JobListener listener) throws Exception;
 
     /**
-     * remove a JobListener
+     * remove a JobListener that was previously registered.  If the given listener is not in
+     * the registry this method has no effect.
+     *
+     * @param listener
+     *      The listener that should be removed from the listener registry.
      *
-     * @param l
      * @throws Exception
      */
-    void removeListener(JobListener l) throws Exception;
+    void removeListener(JobListener listener) throws Exception;
 
     /**
      * Add a job to be scheduled
@@ -70,7 +75,8 @@ public interface JobScheduler {
      *            the message to be sent when the job is scheduled
      * @param delay
      *            the time in milliseconds before the job will be run
-     * @throws Exception
+     *
+     * @throws Exception if an error occurs while scheduling the Job.
      */
     void schedule(String jobId, ByteSequence payload, long delay) throws Exception;
 
@@ -82,8 +88,9 @@ public interface JobScheduler {
      * @param payload
      *            the message to be sent when the job is scheduled
      * @param cronEntry
-     *            - cron entry
-     * @throws Exception
+     *            The cron entry to use to schedule this job.
+     *
+     * @throws Exception if an error occurs while scheduling the Job.
      */
     void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception;
 
@@ -95,7 +102,7 @@ public interface JobScheduler {
      * @param payload
      *            the message to be sent when the job is scheduled
      * @param cronEntry
-     *            - cron entry
+     *            cron entry
      * @param delay
      *            time in ms to wait before scheduling
      * @param period
@@ -110,6 +117,8 @@ public interface JobScheduler {
      * remove all jobs scheduled to run at this time
      *
      * @param time
+     *      The UTC time to use to remove a batch of scheduled Jobs.
+     *
      * @throws Exception
      */
     void remove(long time) throws Exception;
@@ -118,7 +127,9 @@ public interface JobScheduler {
      * remove a job with the matching jobId
      *
      * @param jobId
-     * @throws Exception
+     *      The unique Job Id to search for and remove from the scheduled set of jobs.
+     *
+     * @throws Exception if an error occurs while removing the Job.
      */
     void remove(String jobId) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
index d46d04a..24a216a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
@@ -21,6 +21,12 @@ import java.util.List;
 
 import org.apache.activemq.util.ByteSequence;
 
+/**
+ * A wrapper for instances of the JobScheduler interface that ensures that methods
+ * provides safe and sane return values and can deal with null values being passed
+ * in etc.  Provides a measure of safety when using unknown implementations of the
+ * JobSchedulerStore which might not always do the right thing.
+ */
 public class JobSchedulerFacade implements JobScheduler {
 
     private final SchedulerBroker broker;

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
index 3cbc367..c6863c7 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
@@ -26,13 +26,56 @@ import org.apache.activemq.Service;
  */
 public interface JobSchedulerStore extends Service {
 
+    /**
+     * Gets the location where the Job Scheduler will write the persistent data used
+     * to preserve and recover scheduled Jobs.
+     *
+     * If the scheduler implementation does not utilize a file system based store this
+     * method returns null.
+     *
+     * @return the directory where persistent store data is written.
+     */
     File getDirectory();
 
+    /**
+     * Sets the directory where persistent store data will be written.  This method
+     * must be called before the scheduler store is started to have any effect.
+     *
+     * @param directory
+     *      The directory where the job scheduler store is to be located.
+     */
     void setDirectory(File directory);
 
+    /**
+     * The size of the current store on disk if the store utilizes a disk based store
+     * mechanism.
+     *
+     * @return the current store size on disk.
+     */
     long size();
 
+    /**
+     * Returns the JobScheduler instance identified by the given name.
+     *
+     * @param name
+     *        the name of the JobScheduler instance to lookup.
+     *
+     * @return the named JobScheduler or null if none exists with the given name.
+     *
+     * @throws Exception if an error occurs while loading the named scheduler.
+     */
     JobScheduler getJobScheduler(String name) throws Exception;
 
+    /**
+     * Removes the named JobScheduler if it exists, purging all scheduled messages
+     * assigned to it.
+     *
+     * @param name
+     *        the name of the scheduler instance to remove.
+     *
+     * @return true if there was a scheduler with the given name to remove.
+     *
+     * @throws Exception if an error occurs while removing the scheduler.
+     */
     boolean removeJobScheduler(String name) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
index 6b78d77..fc5b8dd 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
@@ -20,7 +20,11 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 
+/**
+ * A class to provide common Job Scheduler related methods.
+ */
 public class JobSupport {
+
     public static String getDateTime(long value) {
         DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         Date date = new Date(value);
@@ -32,5 +36,4 @@ public class JobSupport {
          Date date = dfm.parse(value);
          return date.getTime();
      }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
index 31efd32..01a9634 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
@@ -22,6 +22,7 @@ import java.util.Set;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -31,74 +32,99 @@ import org.apache.activemq.usage.SystemUsage;
 /**
  * Adapter to the actual persistence mechanism used with ActiveMQ
  *
- * 
+ *
  */
 public interface PersistenceAdapter extends Service {
 
     /**
-     * Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination}
-     * objects that the persistence store is aware exist.
+     * Returns a set of all the
+     * {@link org.apache.activemq.command.ActiveMQDestination} objects that the
+     * persistence store is aware exist.
      *
      * @return active destinations
      */
     Set<ActiveMQDestination> getDestinations();
 
     /**
-     * Factory method to create a new queue message store with the given destination name
+     * Factory method to create a new queue message store with the given
+     * destination name
+     *
      * @param destination
      * @return the message store
-     * @throws IOException 
+     * @throws IOException
      */
     MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;
 
     /**
-     * Factory method to create a new topic message store with the given destination name
-     * @param destination 
+     * Factory method to create a new topic message store with the given
+     * destination name
+     *
+     * @param destination
      * @return the topic message store
-     * @throws IOException 
+     * @throws IOException
      */
     TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
 
     /**
+     * Creates and returns a new Job Scheduler store instance.
+     *
+     * @return a new JobSchedulerStore instance if this Persistence adapter provides its own.
+     *
+     * @throws IOException If an error occurs while creating the new JobSchedulerStore.
+     * @throws UnsupportedOperationException If this adapter does not provide its own
+     *                                       scheduler store implementation.
+     */
+    JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException;
+
+    /**
      * Cleanup method to remove any state associated with the given destination.
      * This method does not stop the message store (it might not be cached).
-     * @param destination Destination to forget
+     *
+     * @param destination
+     *            Destination to forget
      */
     void removeQueueMessageStore(ActiveMQQueue destination);
 
     /**
      * Cleanup method to remove any state associated with the given destination
      * This method does not stop the message store (it might not be cached).
-     * @param destination Destination to forget
+     *
+     * @param destination
+     *            Destination to forget
      */
     void removeTopicMessageStore(ActiveMQTopic destination);
 
     /**
-     * Factory method to create a new persistent prepared transaction store for XA recovery
+     * Factory method to create a new persistent prepared transaction store for
+     * XA recovery
+     *
      * @return transaction store
-     * @throws IOException 
+     * @throws IOException
      */
     TransactionStore createTransactionStore() throws IOException;
 
     /**
-     * This method starts a transaction on the persistent storage - which is nothing to
-     * do with JMS or XA transactions - its purely a mechanism to perform multiple writes
-     * to a persistent store in 1 transaction as a performance optimization.
+     * This method starts a transaction on the persistent storage - which is
+     * nothing to do with JMS or XA transactions - its purely a mechanism to
+     * perform multiple writes to a persistent store in 1 transaction as a
+     * performance optimization.
      * <p/>
-     * Typically one transaction will require one disk synchronization point and so for
-     * real high performance its usually faster to perform many writes within the same
-     * transaction to minimize latency caused by disk synchronization. This is especially
-     * true when using tools like Berkeley Db or embedded JDBC servers.
-     * @param context 
-     * @throws IOException 
+     * Typically one transaction will require one disk synchronization point and
+     * so for real high performance its usually faster to perform many writes
+     * within the same transaction to minimize latency caused by disk
+     * synchronization. This is especially true when using tools like Berkeley
+     * Db or embedded JDBC servers.
+     *
+     * @param context
+     * @throws IOException
      */
     void beginTransaction(ConnectionContext context) throws IOException;
 
-
     /**
      * Commit a persistence transaction
-     * @param context 
-     * @throws IOException 
+     *
+     * @param context
+     * @throws IOException
      *
      * @see PersistenceAdapter#beginTransaction(ConnectionContext context)
      */
@@ -106,40 +132,45 @@ public interface PersistenceAdapter extends Service {
 
     /**
      * Rollback a persistence transaction
-     * @param context 
-     * @throws IOException 
+     *
+     * @param context
+     * @throws IOException
      *
      * @see PersistenceAdapter#beginTransaction(ConnectionContext context)
      */
     void rollbackTransaction(ConnectionContext context) throws IOException;
-    
+
     /**
-     * 
+     *
      * @return last broker sequence
      * @throws IOException
      */
     long getLastMessageBrokerSequenceId() throws IOException;
-    
+
     /**
      * Delete's all the messages in the persistent store.
-     * 
+     *
      * @throws IOException
      */
     void deleteAllMessages() throws IOException;
-        
+
     /**
-     * @param usageManager The UsageManager that is controlling the broker's memory usage.
+     * @param usageManager
+     *            The UsageManager that is controlling the broker's memory
+     *            usage.
      */
     void setUsageManager(SystemUsage usageManager);
-    
+
     /**
      * Set the name of the broker using the adapter
+     *
      * @param brokerName
      */
     void setBrokerName(String brokerName);
-    
+
     /**
      * Set the directory where any data files should be created
+     *
      * @param dir
      */
     void setDirectory(File dir);
@@ -148,26 +179,30 @@ public interface PersistenceAdapter extends Service {
      * @return the directory used by the persistence adaptor
      */
     File getDirectory();
-    
+
     /**
      * checkpoint any
-     * @param sync 
-     * @throws IOException 
+     *
+     * @param sync
+     * @throws IOException
      *
      */
     void checkpoint(boolean sync) throws IOException;
-    
+
     /**
      * A hint to return the size of the store on disk
+     *
      * @return disk space used in bytes of 0 if not implemented
      */
     long size();
 
     /**
-     * return the last stored producer sequenceId for this producer Id
-     * used to suppress duplicate sends on failover reconnect at the transport
-     * when a reconnect occurs
-     * @param id the producerId to find a sequenceId for
+     * return the last stored producer sequenceId for this producer Id used to
+     * suppress duplicate sends on failover reconnect at the transport when a
+     * reconnect occurs
+     *
+     * @param id
+     *            the producerId to find a sequenceId for
      * @return the last stored sequence id or -1 if no suppression needed
      */
     long getLastProducerSequenceId(ProducerId id) throws IOException;

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
index 0fd6bfc..73ea104 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -39,7 +40,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * @org.apache.xbean.XBean
- * 
+ *
  */
 public class MemoryPersistenceAdapter implements PersistenceAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
@@ -49,6 +50,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
     ConcurrentHashMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
     private boolean useExternalMessageReferences;
 
+    @Override
     public Set<ActiveMQDestination> getDestinations() {
         Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(queues.size() + topics.size());
         for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator(); iter.hasNext();) {
@@ -64,6 +66,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
         return new MemoryPersistenceAdapter();
     }
 
+    @Override
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
         MessageStore rc = queues.get(destination);
         if (rc == null) {
@@ -76,6 +79,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
         return rc;
     }
 
+    @Override
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
         TopicMessageStore rc = topics.get(destination);
         if (rc == null) {
@@ -93,6 +97,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
      *
      * @param destination Destination to forget
      */
+    @Override
     public void removeQueueMessageStore(ActiveMQQueue destination) {
         queues.remove(destination);
     }
@@ -102,10 +107,12 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
      *
      * @param destination Destination to forget
      */
+    @Override
     public void removeTopicMessageStore(ActiveMQTopic destination) {
         topics.remove(destination);
     }
 
+    @Override
     public TransactionStore createTransactionStore() throws IOException {
         if (transactionStore == null) {
             transactionStore = new MemoryTransactionStore(this);
@@ -113,25 +120,32 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
         return transactionStore;
     }
 
+    @Override
     public void beginTransaction(ConnectionContext context) {
     }
 
+    @Override
     public void commitTransaction(ConnectionContext context) {
     }
 
+    @Override
     public void rollbackTransaction(ConnectionContext context) {
     }
 
+    @Override
     public void start() throws Exception {
     }
 
+    @Override
     public void stop() throws Exception {
     }
 
+    @Override
     public long getLastMessageBrokerSequenceId() throws IOException {
         return 0;
     }
 
+    @Override
     public void deleteAllMessages() throws IOException {
         for (Iterator<TopicMessageStore> iter = topics.values().iterator(); iter.hasNext();) {
             MemoryMessageStore store = asMemoryMessageStore(iter.next());
@@ -177,38 +191,52 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
      * @param usageManager The UsageManager that is controlling the broker's
      *                memory usage.
      */
+    @Override
     public void setUsageManager(SystemUsage usageManager) {
     }
 
+    @Override
     public String toString() {
         return "MemoryPersistenceAdapter";
     }
 
+    @Override
     public void setBrokerName(String brokerName) {
     }
 
+    @Override
     public void setDirectory(File dir) {
     }
-    
+
+    @Override
     public File getDirectory(){
         return null;
     }
 
+    @Override
     public void checkpoint(boolean sync) throws IOException {
     }
-    
+
+    @Override
     public long size(){
         return 0;
     }
-    
+
     public void setCreateTransactionStore(boolean create) throws IOException {
         if (create) {
             createTransactionStore();
         }
     }
 
+    @Override
     public long getLastProducerSequenceId(ProducerId id) {
         // memory map does duplicate suppression
         return -1;
     }
+
+    @Override
+    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+        // We could eventuall implement an in memory scheduler.
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
index a623de9..2a70194 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
@@ -61,8 +61,9 @@ public final class IOHelper {
     }
 
     /**
-     * Converts any string into a string that is safe to use as a file name.
-     * The result will only include ascii characters and numbers, and the "-","_", and "." characters.
+     * Converts any string into a string that is safe to use as a file name. The
+     * result will only include ascii characters and numbers, and the "-","_",
+     * and "." characters.
      *
      * @param name
      * @return
@@ -76,15 +77,16 @@ public final class IOHelper {
     }
 
     /**
-     * Converts any string into a string that is safe to use as a file name.
-     * The result will only include ascii characters and numbers, and the "-","_", and "." characters.
+     * Converts any string into a string that is safe to use as a file name. The
+     * result will only include ascii characters and numbers, and the "-","_",
+     * and "." characters.
      *
      * @param name
      * @param dirSeparators
      * @param maxFileLength
      * @return
      */
-    public static String toFileSystemSafeName(String name,boolean dirSeparators,int maxFileLength) {
+    public static String toFileSystemSafeName(String name, boolean dirSeparators, int maxFileLength) {
         int size = name.length();
         StringBuffer rc = new StringBuffer(size * 2);
         for (int i = 0; i < size; i++) {
@@ -92,8 +94,7 @@ public final class IOHelper {
             boolean valid = c >= 'a' && c <= 'z';
             valid = valid || (c >= 'A' && c <= 'Z');
             valid = valid || (c >= '0' && c <= '9');
-            valid = valid || (c == '_') || (c == '-') || (c == '.') || (c=='#')
-                    ||(dirSeparators && ( (c == '/') || (c == '\\')));
+            valid = valid || (c == '_') || (c == '-') || (c == '.') || (c == '#') || (dirSeparators && ((c == '/') || (c == '\\')));
 
             if (valid) {
                 rc.append(c);
@@ -105,7 +106,7 @@ public final class IOHelper {
         }
         String result = rc.toString();
         if (result.length() > maxFileLength) {
-            result = result.substring(result.length()-maxFileLength,result.length());
+            result = result.substring(result.length() - maxFileLength, result.length());
         }
         return result;
     }
@@ -168,8 +169,7 @@ public final class IOHelper {
             } else {
                 for (int i = 0; i < files.length; i++) {
                     File file = files[i];
-                    if (file.getName().equals(".")
-                            || file.getName().equals("..")) {
+                    if (file.getName().equals(".") || file.getName().equals("..")) {
                         continue;
                     }
                     if (file.isDirectory()) {
@@ -190,6 +190,27 @@ public final class IOHelper {
         }
     }
 
+    public static void moveFiles(File srcDirectory, File targetDirectory, FilenameFilter filter) throws IOException {
+        if (!srcDirectory.isDirectory()) {
+            throw new IOException("source is not a directory");
+        }
+
+        if (targetDirectory.exists() && !targetDirectory.isDirectory()) {
+            throw new IOException("target exists and is not a directory");
+        } else {
+            mkdirs(targetDirectory);
+        }
+
+        List<File> filesToMove = new ArrayList<File>();
+        getFiles(srcDirectory, filesToMove, filter);
+
+        for (File file : filesToMove) {
+            if (!file.isDirectory()) {
+                moveFile(file, targetDirectory);
+            }
+        }
+    }
+
     public static void copyFile(File src, File dest) throws IOException {
         copyFile(src, dest, null);
     }
@@ -222,32 +243,32 @@ public final class IOHelper {
         File parent = src.getParentFile();
         String fromPath = from.getAbsolutePath();
         if (parent.getAbsolutePath().equals(fromPath)) {
-            //one level down
+            // one level down
             result = to;
-        }else {
+        } else {
             String parentPath = parent.getAbsolutePath();
             String path = parentPath.substring(fromPath.length());
-            result = new File(to.getAbsolutePath()+File.separator+path);
+            result = new File(to.getAbsolutePath() + File.separator + path);
         }
         return result;
     }
 
-    static List<File> getFiles(File dir,FilenameFilter filter){
+    static List<File> getFiles(File dir, FilenameFilter filter) {
         List<File> result = new ArrayList<File>();
-        getFiles(dir,result,filter);
+        getFiles(dir, result, filter);
         return result;
     }
 
-    static void getFiles(File dir,List<File> list,FilenameFilter filter) {
+    static void getFiles(File dir, List<File> list, FilenameFilter filter) {
         if (!list.contains(dir)) {
             list.add(dir);
-            String[] fileNames=dir.list(filter);
-            for (int i =0; i < fileNames.length;i++) {
-                File f = new File(dir,fileNames[i]);
+            String[] fileNames = dir.list(filter);
+            for (int i = 0; i < fileNames.length; i++) {
+                File f = new File(dir, fileNames[i]);
                 if (f.isFile()) {
                     list.add(f);
-                }else {
-                    getFiles(dir,list,filter);
+                } else {
+                    getFiles(dir, list, filter);
                 }
             }
         }
@@ -286,12 +307,13 @@ public final class IOHelper {
     public static void mkdirs(File dir) throws IOException {
         if (dir.exists()) {
             if (!dir.isDirectory()) {
-                throw new IOException("Failed to create directory '" + dir +"', regular file already existed with that name");
+                throw new IOException("Failed to create directory '" + dir +
+                                      "', regular file already existed with that name");
             }
 
         } else {
             if (!dir.mkdirs()) {
-                throw new IOException("Failed to create directory '" + dir+"'");
+                throw new IOException("Failed to create directory '" + dir + "'");
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
index 7ff4ae0..a3a8250 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
@@ -34,6 +34,7 @@ import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.Locker;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -422,6 +423,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
         this.lockDataSource = dataSource;
     }
 
+    @Override
     public BrokerService getBrokerService() {
         return brokerService;
     }
@@ -846,4 +848,9 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
         }
         return result;
     }
+
+    @Override
+    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
index 565fc9f..cc5282f 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.activeio.journal.InvalidRecordLocationException;
 import org.apache.activeio.journal.Journal;
 import org.apache.activeio.journal.JournalEventListener;
@@ -40,6 +41,7 @@ import org.apache.activeio.packet.Packet;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -78,14 +80,14 @@ import org.slf4j.LoggerFactory;
  * An implementation of {@link PersistenceAdapter} designed for use with a
  * {@link Journal} and then check pointing asynchronously on a timeout with some
  * other long term persistent storage.
- * 
+ *
  * @org.apache.xbean.XBean
- * 
+ *
  */
 public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
 
     private BrokerService brokerService;
-	
+
     protected Scheduler scheduler;
     private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class);
 
@@ -118,9 +120,9 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
     private TaskRunnerFactory taskRunnerFactory;
     private File directory;
 
-    public JournalPersistenceAdapter() {        
+    public JournalPersistenceAdapter() {
     }
-    
+
     public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
         setJournal(journal);
         setTaskRunnerFactory(taskRunnerFactory);
@@ -135,13 +137,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         this.journal = journal;
         journal.setJournalEventListener(this);
     }
-    
+
     public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
         this.longTermPersistence = longTermPersistence;
     }
-    
+
     final Runnable createPeriodicCheckpointTask() {
         return new Runnable() {
+            @Override
             public void run() {
                 long lastTime = 0;
                 synchronized (this) {
@@ -158,11 +161,13 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
      * @param usageManager The UsageManager that is controlling the
      *                destination's memory usage.
      */
+    @Override
     public void setUsageManager(SystemUsage usageManager) {
         this.usageManager = usageManager;
         longTermPersistence.setUsageManager(usageManager);
     }
 
+    @Override
     public Set<ActiveMQDestination> getDestinations() {
         Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
         destinations.addAll(queues.keySet());
@@ -178,6 +183,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         }
     }
 
+    @Override
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
         JournalMessageStore store = queues.get(destination);
         if (store == null) {
@@ -188,6 +194,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         return store;
     }
 
+    @Override
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
         JournalTopicMessageStore store = topics.get(destinationName);
         if (store == null) {
@@ -203,6 +210,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
      *
      * @param destination Destination to forget
      */
+    @Override
     public void removeQueueMessageStore(ActiveMQQueue destination) {
         queues.remove(destination);
     }
@@ -212,30 +220,37 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
      *
      * @param destination Destination to forget
      */
+    @Override
     public void removeTopicMessageStore(ActiveMQTopic destination) {
         topics.remove(destination);
     }
 
+    @Override
     public TransactionStore createTransactionStore() throws IOException {
         return transactionStore;
     }
 
+    @Override
     public long getLastMessageBrokerSequenceId() throws IOException {
         return longTermPersistence.getLastMessageBrokerSequenceId();
     }
 
+    @Override
     public void beginTransaction(ConnectionContext context) throws IOException {
         longTermPersistence.beginTransaction(context);
     }
 
+    @Override
     public void commitTransaction(ConnectionContext context) throws IOException {
         longTermPersistence.commitTransaction(context);
     }
 
+    @Override
     public void rollbackTransaction(ConnectionContext context) throws IOException {
         longTermPersistence.rollbackTransaction(context);
     }
 
+    @Override
     public synchronized void start() throws Exception {
         if (!started.compareAndSet(false, true)) {
             return;
@@ -246,12 +261,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         }
 
         checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
+            @Override
             public boolean iterate() {
                 return doCheckpoint();
             }
         }, "ActiveMQ Journal Checkpoint Worker");
 
         checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+            @Override
             public Thread newThread(Runnable runable) {
                 Thread t = new Thread(runable, "Journal checkpoint worker");
                 t.setPriority(7);
@@ -279,6 +296,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
 
     }
 
+    @Override
     public void stop() throws Exception {
 
         this.usageManager.getMemoryUsage().removeUsageListener(this);
@@ -330,16 +348,17 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
     /**
      * The Journal give us a call back so that we can move old data out of the
      * journal. Taking a checkpoint does this for us.
-     * 
+     *
      * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
      */
+    @Override
     public void overflowNotification(RecordLocation safeLocation) {
         checkpoint(false, true);
     }
 
     /**
      * When we checkpoint we move all the journalled data to long term storage.
-     * 
+     *
      */
     public void checkpoint(boolean sync, boolean fullCheckpoint) {
         try {
@@ -369,13 +388,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         }
     }
 
+    @Override
     public void checkpoint(boolean sync) {
         checkpoint(sync, sync);
     }
 
     /**
      * This does the actual checkpoint.
-     * 
+     *
      * @return
      */
     public boolean doCheckpoint() {
@@ -398,7 +418,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
             // We do many partial checkpoints (fullCheckpoint==false) to move
             // topic messages
             // to long term store as soon as possible.
-            // 
+            //
             // We want to avoid doing that for queue messages since removes the
             // come in the same
             // checkpoint cycle will nullify the previous message add.
@@ -411,6 +431,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
                     try {
                         final JournalMessageStore ms = iterator.next();
                         FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
+                            @Override
                             public RecordLocation call() throws Exception {
                                 return ms.checkpoint();
                             }
@@ -428,6 +449,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
                 try {
                     final JournalTopicMessageStore ms = iterator.next();
                     FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
+                        @Override
                         public RecordLocation call() throws Exception {
                             return ms.checkpoint();
                         }
@@ -505,7 +527,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
     /**
      * Move all the messages that were in the journal into long term storage. We
      * just replay and do a checkpoint.
-     * 
+     *
      * @throws IOException
      * @throws IOException
      * @throws InvalidRecordLocationException
@@ -644,11 +666,11 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
     public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
         if (started.get()) {
             try {
-        	    return journal.write(toPacket(wireFormat.marshal(command)), sync);
+                return journal.write(toPacket(wireFormat.marshal(command)), sync);
             } catch (IOException ioe) {
-        	    LOG.error("Cannot write to the journal", ioe);
-        	    brokerService.handleIOException(ioe);
-        	    throw ioe;
+                LOG.error("Cannot write to the journal", ioe);
+                brokerService.handleIOException(ioe);
+                throw ioe;
             }
         }
         throw new IOException("closed");
@@ -660,6 +682,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         return writeCommand(trace, sync);
     }
 
+    @Override
     public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
         newPercentUsage = (newPercentUsage / 10) * 10;
         oldPercentUsage = (oldPercentUsage / 10) * 10;
@@ -673,6 +696,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         return transactionStore;
     }
 
+    @Override
     public void deleteAllMessages() throws IOException {
         try {
             JournalTrace trace = new JournalTrace();
@@ -735,6 +759,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
     }
 
+    @Override
     public void setBrokerName(String brokerName) {
         longTermPersistence.setBrokerName(brokerName);
     }
@@ -744,18 +769,22 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         return "JournalPersistenceAdapter(" + longTermPersistence + ")";
     }
 
+    @Override
     public void setDirectory(File dir) {
         this.directory=dir;
     }
-    
+
+    @Override
     public File getDirectory(){
         return directory;
     }
-    
+
+    @Override
     public long size(){
         return 0;
     }
 
+    @Override
     public void setBrokerService(BrokerService brokerService) {
         this.brokerService = brokerService;
         PersistenceAdapter pa = getLongTermPersistence();
@@ -764,8 +793,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         }
     }
 
+    @Override
     public long getLastProducerSequenceId(ProducerId id) {
         return -1;
     }
 
+    @Override
+    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+        return longTermPersistence.createJobSchedulerStore();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
new file mode 100644
index 0000000..edb2750
--- /dev/null
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.page.Page;
+
+public abstract class AbstractKahaDBMetaData<T> implements KahaDBMetaData<T> {
+
+    private int state;
+    private Location lastUpdateLocation;
+    private Page<T> page;
+
+    @Override
+    public Page<T> getPage() {
+        return page;
+    }
+
+    @Override
+    public int getState() {
+        return state;
+    }
+
+    @Override
+    public Location getLastUpdateLocation() {
+        return lastUpdateLocation;
+    }
+
+    @Override
+    public void setPage(Page<T> page) {
+        this.page = page;
+    }
+
+    @Override
+    public void setState(int value) {
+        this.state = value;
+    }
+
+    @Override
+    public void setLastUpdateLocation(Location location) {
+        this.lastUpdateLocation = location;
+    }
+}


Mime
View raw message