activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/5] https://issues.apache.org/jira/browse/AMQ-3758
Date Mon, 07 Jul 2014 16:43:54 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk aa79c7ec7 -> 74846bb2b


http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyStoreReplayer.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyStoreReplayer.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyStoreReplayer.java
new file mode 100644
index 0000000..92563f4
--- /dev/null
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyStoreReplayer.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.scheduler.legacy;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to upgrade a Legacy Job Scheduler store to the latest version this class
+ * loads a found legacy scheduler store and generates new add commands for all
+ * jobs currently in the store.
+ */
+public class LegacyStoreReplayer {
+
+    static final Logger LOG = LoggerFactory.getLogger(LegacyStoreReplayer.class);
+
+    private LegacyJobSchedulerStoreImpl store;
+    private final File legacyStoreDirectory;
+
+    /**
+     * Creates a new Legacy Store Replayer with the given target store
+     * @param targetStore
+     * @param directory
+     */
+    public LegacyStoreReplayer(File directory) {
+        this.legacyStoreDirectory = directory;
+    }
+
+    /**
+     * Loads the legacy store and prepares it for replay into a newer Store instance.
+     *
+     * @throws IOException if an error occurs while reading in the legacy store.
+     */
+    public void load() throws IOException {
+
+        store = new LegacyJobSchedulerStoreImpl();
+        store.setDirectory(legacyStoreDirectory);
+        store.setFailIfDatabaseIsLocked(true);
+
+        try {
+            store.start();
+        } catch (IOException ioe) {
+            LOG.warn("Legacy store load failed: ", ioe);
+            throw ioe;
+        } catch (Exception e) {
+            LOG.warn("Legacy store load failed: ", e);
+            throw new IOException(e);
+        }
+    }
+
+    /**
+     * Unloads a previously loaded legacy store to release any resources associated with it.
+     *
+     * Once a store is unloaded it cannot be replayed again until it has been reloaded.
+     * @throws IOException
+     */
+    public void unload() throws IOException {
+
+        if (store != null) {
+            try {
+                store.stop();
+            } catch (Exception e) {
+                LOG.warn("Legacy store unload failed: ", e);
+                throw new IOException(e);
+            } finally {
+                store = null;
+            }
+        }
+    }
+
+    /**
+     * Performs a replay of scheduled jobs into the target JobSchedulerStore.
+     *
+     * @param targetStore
+     *      The JobSchedulerStore that will receive the replay events from the legacy store.
+     *
+     * @throws IOException if an error occurs during replay of the legacy store.
+     */
+    public void startReplay(JobSchedulerStoreImpl targetStore) throws IOException {
+        checkLoaded();
+
+        if (targetStore == null) {
+            throw new IOException("Cannot replay to a null store");
+        }
+
+        try {
+            Set<String> schedulers = store.getJobSchedulerNames();
+            if (!schedulers.isEmpty()) {
+
+                for (String name : schedulers) {
+                    LegacyJobSchedulerImpl scheduler = store.getJobScheduler(name);
+                    LOG.info("Replay of legacy store {} starting.", name);
+                    replayScheduler(scheduler, targetStore);
+                }
+            }
+
+            LOG.info("Replay of legacy store complate.");
+        } catch (IOException ioe) {
+            LOG.warn("Failed during replay of legacy store: ", ioe);
+            throw ioe;
+        } catch (Exception e) {
+            LOG.warn("Failed during replay of legacy store: ", e);
+            throw new IOException(e);
+        }
+    }
+
+    private final void replayScheduler(LegacyJobSchedulerImpl legacy, JobSchedulerStoreImpl target) throws Exception {
+        List<LegacyJobImpl> jobs = legacy.getAllJobs();
+
+        String schedulerName = legacy.getName();
+
+        for (LegacyJobImpl job : jobs) {
+            LOG.trace("Storing job from legacy store to new store: {}", job);
+            KahaAddScheduledJobCommand newJob = new KahaAddScheduledJobCommand();
+            newJob.setScheduler(schedulerName);
+            newJob.setJobId(job.getJobId());
+            newJob.setStartTime(job.getStartTime());
+            newJob.setCronEntry(job.getCronEntry());
+            newJob.setDelay(job.getDelay());
+            newJob.setPeriod(job.getPeriod());
+            newJob.setRepeat(job.getRepeat());
+            newJob.setNextExecutionTime(job.getNextExecutionTime());
+            newJob.setPayload(job.getPayload());
+
+            target.store(newJob);
+        }
+    }
+
+    private final void checkLoaded() throws IOException {
+        if (this.store == null) {
+            throw new IOException("Cannot replay until legacy store is loaded.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-kahadb-store/src/main/proto/journal-data.proto
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/proto/journal-data.proto b/activemq-kahadb-store/src/main/proto/journal-data.proto
index 8290c4c..01607a5 100644
--- a/activemq-kahadb-store/src/main/proto/journal-data.proto
+++ b/activemq-kahadb-store/src/main/proto/journal-data.proto
@@ -32,6 +32,11 @@ enum KahaEntryType {
   KAHA_PRODUCER_AUDIT_COMMAND = 8;
   KAHA_ACK_MESSAGE_FILE_MAP_COMMAND = 9;
   KAHA_UPDATE_MESSAGE_COMMAND = 10;
+  KAHA_ADD_SCHEDULED_JOB_COMMAND = 11;
+  KAHA_RESCHEDULE_JOB_COMMAND = 12;
+  KAHA_REMOVE_SCHEDULED_JOB_COMMAND = 13;
+  KAHA_REMOVE_SCHEDULED_JOBS_COMMAND = 14;
+  KAHA_DESTROY_SCHEDULER_COMMAND = 15;
 }
 
 message KahaTraceCommand {
@@ -179,6 +184,62 @@ message KahaLocation {
   required int32 offset = 2;
 }
 
+message KahaAddScheduledJobCommand {
+  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaAddScheduledJobCommand>";
+  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  required string scheduler=1;
+  required string job_id=2;
+  required int64 start_time=3;
+  required string cron_entry=4;
+  required int64 delay=5;
+  required int64 period=6;
+  required int32 repeat=7;
+  required bytes payload=8;
+  required int64 next_execution_time=9;
+}
+
+message KahaRescheduleJobCommand {
+  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRescheduleJobCommand>";
+  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  required string scheduler=1;
+  required string job_id=2;
+  required int64 execution_time=3;
+  required int64 next_execution_time=4;
+  required int32 rescheduled_count=5;
+}
+
+message KahaRemoveScheduledJobCommand {
+  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRemoveScheduledJobCommand>";
+  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  required string scheduler=1;
+  required string job_id=2;
+  required int64 next_execution_time=3;
+}
+
+message KahaRemoveScheduledJobsCommand {
+  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRemoveScheduledJobsCommand>";
+  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  required string scheduler=1;
+  required int64 start_time=2;
+  required int64 end_time=3;
+}
+
+message KahaDestroySchedulerCommand {
+  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaDestroySchedulerCommand>";
+  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  required string scheduler=1;
+}
+
 // TODO things to ponder
 // should we move more message fields
 // that are set by the sender (and rarely required by the broker

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 146269d..4d33a2c 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -35,6 +35,7 @@ import org.apache.activemq.leveldb.util.Log
 import org.apache.activemq.store.PList.PListIterator
 import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream}
 import org.fusesource.hawtdispatch;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 
 object LevelDBStore extends Log {
   val DEFAULT_DIRECTORY = new File("LevelDB");
@@ -602,6 +603,10 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
     rc
   }
 
+  def createJobSchedulerStore():JobSchedulerStore = {
+    throw new UnsupportedOperationException();
+  }
+
   def removeTopicMessageStore(destination: ActiveMQTopic): Unit = {
     topics.remove(destination).foreach { store=>
       store.subscriptions.values.foreach { sub =>

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
index bd3904f..efd55f3 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
@@ -25,6 +25,7 @@ import java.io.File
 import java.io.IOException
 import java.util.Set
 import org.apache.activemq.util.{ServiceStopper, ServiceSupport}
+import org.apache.activemq.broker.scheduler.JobSchedulerStore
 
 /**
  */
@@ -44,6 +45,10 @@ abstract class ProxyLevelDBStore extends LockableServiceSupport with BrokerServi
     return proxy_target.createTopicMessageStore(destination)
   }
 
+  def createJobSchedulerStore():JobSchedulerStore = {
+    return proxy_target.createJobSchedulerStore()
+  }
+
   def setDirectory(dir: File) {
     proxy_target.setDirectory(dir)
   }

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java
index 04277bf..641ee97 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java
@@ -39,6 +39,7 @@ public class JobSchedulerBrokerShutdownTest extends EmbeddedBrokerTestSupport {
 
         BrokerService broker = super.createBroker();
         broker.setSchedulerSupport(true);
+        broker.setDataDirectory("target");
         broker.setSchedulerDirectoryFile(schedulerDirectory);
         broker.getSystemUsage().getStoreUsage().setLimit(1 * 512);
         broker.deleteAllMessages();

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java
new file mode 100644
index 0000000..8adb980
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.openmbean.TabularData;
+
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests of the JMX JobSchedulerStore management MBean.
+ */
+public class JobSchedulerJmxManagementTests extends JobSchedulerTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerJmxManagementTests.class);
+
+    @Test
+    public void testJobSchedulerMBeanIsRegistered() throws Exception {
+        JobSchedulerViewMBean view = getJobSchedulerMBean();
+        assertNotNull(view);
+        assertTrue(view.getAllJobs().isEmpty());
+    }
+
+    @Test
+    public void testGetNumberOfJobs() throws Exception {
+        JobSchedulerViewMBean view = getJobSchedulerMBean();
+        assertNotNull(view);
+        assertTrue(view.getAllJobs().isEmpty());
+        scheduleMessage(60000, -1, -1);
+        assertFalse(view.getAllJobs().isEmpty());
+        assertEquals(1, view.getAllJobs().size());
+        scheduleMessage(60000, -1, -1);
+        assertEquals(2, view.getAllJobs().size());
+    }
+
+    @Test
+    public void testRemvoeJob() throws Exception {
+        JobSchedulerViewMBean view = getJobSchedulerMBean();
+        assertNotNull(view);
+        assertTrue(view.getAllJobs().isEmpty());
+        scheduleMessage(60000, -1, -1);
+        assertFalse(view.getAllJobs().isEmpty());
+        TabularData jobs = view.getAllJobs();
+        assertEquals(1, jobs.size());
+        for (Object key : jobs.keySet()) {
+            String jobId = ((List<?>)key).get(0).toString();
+            LOG.info("Attempting to remove Job: {}", jobId);
+            view.removeJob(jobId);
+        }
+        assertTrue(view.getAllJobs().isEmpty());
+    }
+
+    @Test
+    public void testRemvoeJobInRange() throws Exception {
+        JobSchedulerViewMBean view = getJobSchedulerMBean();
+        assertNotNull(view);
+        assertTrue(view.getAllJobs().isEmpty());
+        scheduleMessage(60000, -1, -1);
+        assertFalse(view.getAllJobs().isEmpty());
+        String now = JobSupport.getDateTime(System.currentTimeMillis());
+        String later = JobSupport.getDateTime(System.currentTimeMillis() + 120 * 1000);
+        view.removeAllJobs(now, later);
+        assertTrue(view.getAllJobs().isEmpty());
+    }
+
+    @Test
+    public void testGetNextScheduledJob() throws Exception {
+        JobSchedulerViewMBean view = getJobSchedulerMBean();
+        assertNotNull(view);
+        assertTrue(view.getAllJobs().isEmpty());
+        scheduleMessage(60000, -1, -1);
+        assertFalse(view.getAllJobs().isEmpty());
+        long before = System.currentTimeMillis() + 57 * 1000;
+        long toLate = System.currentTimeMillis() + 63 * 1000;
+        String next = view.getNextScheduleTime();
+        long nextTime = JobSupport.getDataTime(next);
+        LOG.info("Next Scheduled Time: {}", next);
+        assertTrue(nextTime > before);
+        assertTrue(nextTime < toLate);
+    }
+
+    @Test
+    public void testGetExecutionCount() throws Exception {
+        final JobSchedulerViewMBean view = getJobSchedulerMBean();
+        assertNotNull(view);
+        assertTrue(view.getAllJobs().isEmpty());
+        scheduleMessage(10000, 1000, 10);
+        assertFalse(view.getAllJobs().isEmpty());
+        TabularData jobs = view.getAllJobs();
+        assertEquals(1, jobs.size());
+        String jobId = null;
+        for (Object key : jobs.keySet()) {
+            jobId = ((List<?>)key).get(0).toString();
+        }
+
+        final String fixedJobId = jobId;
+        LOG.info("Attempting to get execution count for Job: {}", jobId);
+        assertEquals(0, view.getExecutionCount(jobId));
+
+        assertTrue("Should execute again", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return view.getExecutionCount(fixedJobId) > 0;
+            }
+        }));
+    }
+
+    @Override
+    protected boolean isUseJmx() {
+        return true;
+    }
+
+    protected void scheduleMessage(int time, int period, int repeat) throws Exception {
+        Connection connection = createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        TextMessage message = session.createTextMessage("test msg");
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
+        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
+        producer.send(message);
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
index bc89d9e..c82f8ef 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
@@ -16,7 +16,11 @@
  */
 package org.apache.activemq.broker.scheduler;
 
-import java.io.File;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -29,18 +33,17 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
-import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.ScheduledMessage;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.IdGenerator;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
+public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
 
     private static final transient Logger LOG = LoggerFactory.getLogger(JobSchedulerManagementTest.class);
 
+    @Test
     public void testRemoveAllScheduled() throws Exception {
         final int COUNT = 5;
         Connection connection = createConnection();
@@ -77,6 +80,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
         assertEquals(latch.getCount(), COUNT);
     }
 
+    @Test
     public void testRemoveAllScheduledAtTime() throws Exception {
         final int COUNT = 3;
         Connection connection = createConnection();
@@ -122,8 +126,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
         // Send the remove request
         MessageProducer producer = session.createProducer(management);
         Message request = session.createMessage();
-        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
-                                  ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
         request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));
         request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));
         producer.send(request);
@@ -143,6 +146,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
         assertEquals(2, latch.getCount());
     }
 
+    @Test
     public void testBrowseAllScheduled() throws Exception {
         final int COUNT = 10;
         Connection connection = createConnection();
@@ -191,7 +195,8 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
         Thread.sleep(2000);
         assertEquals(latch.getCount(), COUNT);
 
-        // now see if we got all the scheduled messages on the browse destination.
+        // now see if we got all the scheduled messages on the browse
+        // destination.
         latch.await(10, TimeUnit.SECONDS);
         assertEquals(browsedLatch.getCount(), 0);
 
@@ -200,6 +205,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
         assertEquals(latch.getCount(), 0);
     }
 
+    @Test
     public void testBrowseWindowlScheduled() throws Exception {
         final int COUNT = 10;
         Connection connection = createConnection();
@@ -255,15 +261,18 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
         Thread.sleep(2000);
         assertEquals(COUNT + 2, latch.getCount());
 
-        // now see if we got all the scheduled messages on the browse destination.
+        // now see if we got all the scheduled messages on the browse
+        // destination.
         latch.await(15, TimeUnit.SECONDS);
         assertEquals(0, browsedLatch.getCount());
 
-        // now see if we got all the scheduled messages on the browse destination.
+        // now see if we got all the scheduled messages on the browse
+        // destination.
         latch.await(20, TimeUnit.SECONDS);
         assertEquals(0, latch.getCount());
     }
 
+    @Test
     public void testRemoveScheduled() throws Exception {
         final int COUNT = 10;
         Connection connection = createConnection();
@@ -297,8 +306,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
 
         // Send the browse request
         Message request = session.createMessage();
-        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
-                                  ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
         request.setJMSReplyTo(browseDest);
         producer.send(request);
 
@@ -307,14 +315,12 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
             Message message = browser.receive(2000);
             assertNotNull(message);
 
-            try{
+            try {
                 Message remove = session.createMessage();
-                remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
-                        ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
-                remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID,
-                        message.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID));
+                remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
+                remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, message.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID));
                 producer.send(remove);
-            } catch(Exception e) {
+            } catch (Exception e) {
             }
         }
 
@@ -323,6 +329,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
         assertEquals(COUNT, latch.getCount());
     }
 
+    @Test
     public void testRemoveNotScheduled() throws Exception {
         Connection connection = createConnection();
 
@@ -333,19 +340,19 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
 
         MessageProducer producer = session.createProducer(management);
 
-        try{
+        try {
 
             // Send the remove request
             Message remove = session.createMessage();
-            remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
-                    ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
+            remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
             remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, new IdGenerator().generateId());
             producer.send(remove);
-        } catch(Exception e) {
+        } catch (Exception e) {
             fail("Caught unexpected exception during remove of unscheduled message.");
         }
     }
 
+    @Test
     public void testBrowseWithSelector() throws Exception {
         Connection connection = createConnection();
 
@@ -362,7 +369,7 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
         Destination browseDest = session.createTemporaryTopic();
 
         // Create the "Browser"
-        MessageConsumer browser = session.createConsumer(browseDest, ScheduledMessage.AMQ_SCHEDULED_DELAY + " = 45000" );
+        MessageConsumer browser = session.createConsumer(browseDest, ScheduledMessage.AMQ_SCHEDULED_DELAY + " = 45000");
 
         connection.start();
 
@@ -383,7 +390,6 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
         assertNull(message);
     }
 
-
     protected void scheduleMessage(Connection connection, long delay) throws Exception {
         scheduleMessage(connection, delay, 1);
     }
@@ -394,38 +400,10 @@ public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
         TextMessage message = session.createTextMessage("test msg");
         message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
 
-        for(int i = 0; i < count; ++i ) {
+        for (int i = 0; i < count; ++i) {
             producer.send(message);
         }
 
         producer.close();
     }
-
-    @Override
-    protected void setUp() throws Exception {
-        bindAddress = "vm://localhost";
-        super.setUp();
-    }
-
-    @Override
-    protected BrokerService createBroker() throws Exception {
-        return createBroker(true);
-    }
-
-    protected BrokerService createBroker(boolean delete) throws Exception {
-        File schedulerDirectory = new File("target/scheduler");
-        if (delete) {
-            IOHelper.mkdirs(schedulerDirectory);
-            IOHelper.deleteChildren(schedulerDirectory);
-        }
-        BrokerService answer = new BrokerService();
-        answer.setPersistent(true);
-        answer.setDeleteAllMessagesOnStartup(true);
-        answer.setDataDirectory("target");
-        answer.setSchedulerDirectoryFile(schedulerDirectory);
-        answer.setSchedulerSupport(true);
-        answer.setUseJmx(false);
-        answer.addConnector(bindAddress);
-        return answer;
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
new file mode 100644
index 0000000..c013a4c
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobSchedulerStoreCheckpointTest {
+
+    static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreCheckpointTest.class);
+
+    private JobSchedulerStoreImpl store;
+    private JobScheduler scheduler;
+    private ByteSequence payload;
+
+    @Before
+    public void setUp() throws Exception {
+        File directory = new File("target/test/ScheduledJobsDB");
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        startStore(directory);
+
+        byte[] data = new byte[8192];
+        for (int i = 0; i < data.length; ++i) {
+            data[i] = (byte) (i % 256);
+        }
+
+        payload = new ByteSequence(data);
+    }
+
+    protected void startStore(File directory) throws Exception {
+        store = new JobSchedulerStoreImpl();
+        store.setDirectory(directory);
+        store.setCheckpointInterval(5000);
+        store.setCleanupInterval(10000);
+        store.setJournalMaxFileLength(10 * 1024);
+        store.start();
+        scheduler = store.getJobScheduler("test");
+        scheduler.startDispatching();
+    }
+
+    private int getNumJournalFiles() throws IOException {
+        return store.getJournal().getFileMap().size();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        scheduler.stopDispatching();
+        store.stop();
+    }
+
+    @Test
+    public void test() throws Exception {
+        final int COUNT = 10;
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        scheduler.addListener(new JobListener() {
+            @Override
+            public void scheduledJob(String id, ByteSequence job) {
+                latch.countDown();
+            }
+        });
+
+        long time = TimeUnit.SECONDS.toMillis(30);
+        for (int i = 0; i < COUNT; i++) {
+            scheduler.schedule("id" + i, payload, "", time, 0, 0);
+        }
+
+        int size = scheduler.getAllJobs().size();
+        assertEquals(size, COUNT);
+
+        LOG.info("Number of journal log files: {}", getNumJournalFiles());
+        // need a little slack so go over 60 seconds
+        assertTrue(latch.await(70, TimeUnit.SECONDS));
+        assertEquals(0, latch.getCount());
+
+        for (int i = 0; i < COUNT; i++) {
+            scheduler.schedule("id" + i, payload, "", time, 0, 0);
+        }
+
+        LOG.info("Number of journal log files: {}", getNumJournalFiles());
+        // need a little slack so go over 60 seconds
+        assertTrue(latch.await(70, TimeUnit.SECONDS));
+        assertEquals(0, latch.getCount());
+
+        assertTrue("Should be only one log left: " + getNumJournalFiles(), Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getNumJournalFiles() == 1;
+            }
+        }, TimeUnit.MINUTES.toMillis(2)));
+
+        LOG.info("Number of journal log files: {}", getNumJournalFiles());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
index 0e0c1d7..df1e7ff 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
@@ -16,50 +16,62 @@
  */
 package org.apache.activemq.broker.scheduler;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 
-import junit.framework.TestCase;
-
 import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOHelper;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobSchedulerStoreTest {
 
-public class JobSchedulerStoreTest extends TestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreTest.class);
 
+    @Test(timeout = 120 * 1000)
     public void testRestart() throws Exception {
         JobSchedulerStore store = new JobSchedulerStoreImpl();
         File directory = new File("target/test/ScheduledDB");
-          IOHelper.mkdirs(directory);
-          IOHelper.deleteChildren(directory);
-          store.setDirectory(directory);
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        store.setDirectory(directory);
         final int NUMBER = 1000;
         store.start();
-        List<ByteSequence>list = new ArrayList<ByteSequence>();
-        for (int i = 0; i < NUMBER;i++ ) {
-            ByteSequence buff = new ByteSequence(new String("testjob"+i).getBytes());
+        List<ByteSequence> list = new ArrayList<ByteSequence>();
+        for (int i = 0; i < NUMBER; i++) {
+            ByteSequence buff = new ByteSequence(new String("testjob" + i).getBytes());
             list.add(buff);
         }
+
         JobScheduler js = store.getJobScheduler("test");
         js.startDispatching();
         int count = 0;
-        long startTime = 10 * 60 * 1000; long period = startTime;
-        for (ByteSequence job:list) {
-            js.schedule("id:"+(count++), job, "", startTime, period, -1);
+        long startTime = 10 * 60 * 1000;
+        long period = startTime;
+        for (ByteSequence job : list) {
+            js.schedule("id:" + (count++), job, "", startTime, period, -1);
         }
-        List<Job>test = js.getAllJobs();
-        assertEquals(list.size(),test.size());
+
+        List<Job> test = js.getAllJobs();
+        LOG.debug("Found {} jobs in the store before restart", test.size());
+        assertEquals(list.size(), test.size());
         store.stop();
 
         store.start();
         js = store.getJobScheduler("test");
         test = js.getAllJobs();
-        assertEquals(list.size(),test.size());
-        for (int i = 0; i < list.size();i++) {
+        LOG.debug("Found {} jobs in the store after restart", test.size());
+        assertEquals(list.size(), test.size());
+
+        for (int i = 0; i < list.size(); i++) {
             String orig = new String(list.get(i).getData());
             String payload = new String(test.get(i).getPayload());
-            assertEquals(orig,payload);
+            assertEquals(orig, payload);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
index 5126970..2210eba 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
@@ -31,8 +31,13 @@ import org.apache.activemq.util.IOHelper;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class JobSchedulerTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerTest.class);
+
     private JobSchedulerStore store;
     private JobScheduler scheduler;
 
@@ -173,6 +178,37 @@ public class JobSchedulerTest {
     }
 
     @Test
+    public void testGetExecutionCount() throws Exception {
+        final String jobId = "Job-1";
+        long time = 10000;
+        final CountDownLatch done = new CountDownLatch(10);
+
+        String str = new String("test");
+        scheduler.schedule(jobId, new ByteSequence(str.getBytes()), "", time, 1000, 10);
+
+        int size = scheduler.getAllJobs().size();
+        assertEquals(size, 1);
+
+        scheduler.addListener(new JobListener() {
+            @Override
+            public void scheduledJob(String id, ByteSequence job) {
+                LOG.info("Job exectued: {}", 11 - done.getCount());
+                done.countDown();
+            }
+        });
+
+        List<Job> jobs = scheduler.getNextScheduleJobs();
+        assertEquals(1, jobs.size());
+        Job job = jobs.get(0);
+        assertEquals(jobId, job.getJobId());
+        assertEquals(0, job.getExecutionCount());
+        assertTrue("Should have fired ten times.", done.await(60, TimeUnit.SECONDS));
+        // The job is not updated on the last firing as it is removed from the store following
+        // it's last execution so the count will always be one less than the max firings.
+        assertTrue(job.getExecutionCount() >= 9);
+    }
+
+    @Test
     public void testgetAllJobs() throws Exception {
         final int COUNT = 10;
         final String ID = "id:";

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
new file mode 100644
index 0000000..2b25797
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.scheduler;
+
+import java.io.File;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Queue;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+/**
+ * Base class for tests of the Broker's JobSchedulerStore.
+ */
+public class JobSchedulerTestSupport {
+
+    @Rule public TestName name = new TestName();
+
+    protected String connectionUri;
+    protected BrokerService broker;
+    protected JobScheduler jobScheduler;
+    protected Queue destination;
+
+    @Before
+    public void setUp() throws Exception {
+        connectionUri = "vm://localhost";
+        destination = new ActiveMQQueue(name.getMethodName());
+
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+
+        jobScheduler = broker.getJobSchedulerStore().getJobScheduler("JMS");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    protected Connection createConnection() throws Exception {
+        return createConnectionFactory().createConnection();
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return createBroker(true);
+    }
+
+    protected boolean isUseJmx() {
+        return false;
+    }
+
+    protected JobSchedulerViewMBean getJobSchedulerMBean() throws Exception {
+        ObjectName objectName = broker.getAdminView().getJMSJobScheduler();
+        JobSchedulerViewMBean scheduler = null;
+        if (objectName != null) {
+            scheduler = (JobSchedulerViewMBean) broker.getManagementContext()
+                .newProxyInstance(objectName, JobSchedulerViewMBean.class, true);
+        }
+
+        return scheduler;
+    }
+
+    protected BrokerService createBroker(boolean delete) throws Exception {
+        File schedulerDirectory = new File("target/scheduler");
+        if (delete) {
+            IOHelper.mkdirs(schedulerDirectory);
+            IOHelper.deleteChildren(schedulerDirectory);
+        }
+
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(true);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.setDataDirectory("target");
+        answer.setSchedulerDirectoryFile(schedulerDirectory);
+        answer.setSchedulerSupport(true);
+        answer.setUseJmx(isUseJmx());
+        return answer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java
new file mode 100644
index 0000000..9d0fef2
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java
@@ -0,0 +1,179 @@
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.ProtectionDomain;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KahaDBSchedulerIndexRebuildTest {
+
+    static final Logger LOG = LoggerFactory.getLogger(KahaDBSchedulerIndexRebuildTest.class);
+
+    private BrokerService broker = null;
+    private final int NUM_JOBS = 50;
+
+    static String basedir;
+    static {
+        try {
+            ProtectionDomain protectionDomain = SchedulerDBVersionTest.class.getProtectionDomain();
+            basedir = new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../.").getCanonicalPath();
+        } catch (IOException e) {
+            basedir = ".";
+        }
+    }
+
+    private final File schedulerStoreDir = new File(basedir, "activemq-data/store/scheduler");
+    private final File storeDir = new File(basedir, "activemq-data/store/");
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.info("Test Dir = {}", schedulerStoreDir);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    @Test
+    public void testIndexRebuilds() throws Exception {
+        IOHelper.deleteFile(schedulerStoreDir);
+
+        JobSchedulerStoreImpl schedulerStore = createScheduler();
+        broker = createBroker(schedulerStore);
+        broker.start();
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = cf.createConnection();
+        connection.start();
+        for (int i = 0; i < NUM_JOBS; ++i) {
+            scheduleRepeating(connection);
+        }
+        connection.close();
+
+        JobScheduler scheduler = schedulerStore.getJobScheduler("JMS");
+        assertNotNull(scheduler);
+        assertEquals(NUM_JOBS, scheduler.getAllJobs().size());
+
+        broker.stop();
+
+        IOHelper.delete(new File(schedulerStoreDir, "scheduleDB.data"));
+
+        schedulerStore = createScheduler();
+        broker = createBroker(schedulerStore);
+        broker.start();
+
+        scheduler = schedulerStore.getJobScheduler("JMS");
+        assertNotNull(scheduler);
+        assertEquals(NUM_JOBS, scheduler.getAllJobs().size());
+    }
+
+    @Test
+    public void testIndexRebuildsAfterSomeJobsExpire() throws Exception {
+        IOHelper.deleteFile(schedulerStoreDir);
+
+        JobSchedulerStoreImpl schedulerStore = createScheduler();
+        broker = createBroker(schedulerStore);
+        broker.start();
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = cf.createConnection();
+        connection.start();
+        for (int i = 0; i < NUM_JOBS; ++i) {
+            scheduleRepeating(connection);
+            scheduleOneShot(connection);
+        }
+        connection.close();
+
+        JobScheduler scheduler = schedulerStore.getJobScheduler("JMS");
+        assertNotNull(scheduler);
+        assertEquals(NUM_JOBS * 2, scheduler.getAllJobs().size());
+
+        final JobScheduler awaitingOneShotTimeout = scheduler;
+        assertTrue("One shot jobs should time out", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return awaitingOneShotTimeout.getAllJobs().size() == NUM_JOBS;
+            }
+        }, TimeUnit.MINUTES.toMillis(2)));
+
+        broker.stop();
+
+        IOHelper.delete(new File(schedulerStoreDir, "scheduleDB.data"));
+
+        schedulerStore = createScheduler();
+        broker = createBroker(schedulerStore);
+        broker.start();
+
+        scheduler = schedulerStore.getJobScheduler("JMS");
+        assertNotNull(scheduler);
+        assertEquals(NUM_JOBS, scheduler.getAllJobs().size());
+    }
+
+    private void scheduleRepeating(Connection connection) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test.queue");
+        MessageProducer producer = session.createProducer(queue);
+
+        TextMessage message = session.createTextMessage("test msg");
+        long time = 360 * 1000;
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500);
+        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1);
+        producer.send(message);
+        producer.close();
+    }
+
+    private void scheduleOneShot(Connection connection) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test.queue");
+        MessageProducer producer = session.createProducer(queue);
+
+        TextMessage message = session.createTextMessage("test msg");
+        long time = TimeUnit.SECONDS.toMillis(30);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 0);
+        producer.send(message);
+        producer.close();
+    }
+
+    protected JobSchedulerStoreImpl createScheduler() {
+        JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl();
+        scheduler.setDirectory(schedulerStoreDir);
+        scheduler.setJournalMaxFileLength(10 * 1024);
+        return scheduler;
+    }
+
+    protected BrokerService createBroker(JobSchedulerStoreImpl scheduler) throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setJobSchedulerStore(scheduler);
+        answer.setPersistent(true);
+        answer.setDataDirectory(storeDir.getAbsolutePath());
+        answer.setSchedulerSupport(true);
+        answer.setUseJmx(false);
+        return answer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java
new file mode 100644
index 0000000..30da10d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.ProtectionDomain;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *Test that the store recovers even if some log files are missing.
+ */
+public class KahaDBSchedulerMissingJournalLogsTest {
+
+    static final Logger LOG = LoggerFactory.getLogger(KahaDBSchedulerIndexRebuildTest.class);
+
+    private BrokerService broker = null;
+    private JobSchedulerStoreImpl schedulerStore = null;
+
+    private final int NUM_LOGS = 6;
+
+    static String basedir;
+    static {
+        try {
+            ProtectionDomain protectionDomain = SchedulerDBVersionTest.class.getProtectionDomain();
+            basedir = new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../.").getCanonicalPath();
+        } catch (IOException e) {
+            basedir = ".";
+        }
+    }
+
+    private final File schedulerStoreDir = new File(basedir, "activemq-data/store/scheduler");
+    private final File storeDir = new File(basedir, "activemq-data/store/");
+
+    /**
+     * @throws java.lang.Exception
+     */
+    @Before
+    public void setUp() throws Exception {
+        IOHelper.deleteFile(schedulerStoreDir);
+        LOG.info("Test Dir = {}", schedulerStoreDir);
+
+        createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+
+        schedulerStore = (JobSchedulerStoreImpl) broker.getJobSchedulerStore();
+    }
+
+    /**
+     * @throws java.lang.Exception
+     */
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test(timeout=120 * 1000)
+    public void testMissingLogsCausesBrokerToFail() throws Exception {
+        fillUpSomeLogFiles();
+
+        int jobCount = schedulerStore.getJobScheduler("JMS").getAllJobs().size();
+        LOG.info("There are {} jobs in the store.", jobCount);
+
+        List<File> toDelete = new ArrayList<File>();
+        Map<Integer, DataFile> files = schedulerStore.getJournal().getFileMap();
+        for (int i = files.size(); i > files.size() / 2; i--) {
+            toDelete.add(files.get(i).getFile());
+        }
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+        for (File file : toDelete) {
+            LOG.info("File to delete: {}", file);
+            IOHelper.delete(file);
+        }
+
+        try {
+            createBroker();
+            fail("Should not start when logs are missing.");
+        } catch (Exception e) {
+        }
+    }
+
+    @Test(timeout=120 * 1000)
+    public void testRecoverWhenSomeLogsAreMissing() throws Exception {
+        fillUpSomeLogFiles();
+
+        int jobCount = schedulerStore.getJobScheduler("JMS").getAllJobs().size();
+        LOG.info("There are {} jobs in the store.", jobCount);
+
+        List<File> toDelete = new ArrayList<File>();
+        Map<Integer, DataFile> files = schedulerStore.getJournal().getFileMap();
+        for (int i = files.size() - 1; i > files.size() / 2; i--) {
+            toDelete.add(files.get(i).getFile());
+        }
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+        for (File file : toDelete) {
+            LOG.info("File to delete: {}", file);
+            IOHelper.delete(file);
+        }
+
+        schedulerStore = createScheduler();
+        schedulerStore.setIgnoreMissingJournalfiles(true);
+
+        createBroker(schedulerStore);
+        broker.start();
+        broker.waitUntilStarted();
+
+        int postRecoverJobCount = schedulerStore.getJobScheduler("JMS").getAllJobs().size();
+        assertTrue(postRecoverJobCount > 0);
+        assertTrue(postRecoverJobCount < jobCount);
+    }
+
+    private void fillUpSomeLogFiles() throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = cf.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test.queue");
+        MessageProducer producer = session.createProducer(queue);
+        connection.start();
+        while (true) {
+            scheduleRepeating(session, producer);
+            if (schedulerStore.getJournal().getFileMap().size() == NUM_LOGS) {
+                break;
+            }
+        }
+        connection.close();
+    }
+
+    private void scheduleRepeating(Session session, MessageProducer producer) throws Exception {
+        TextMessage message = session.createTextMessage("test msg");
+        long time = 360 * 1000;
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500);
+        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1);
+        producer.send(message);
+    }
+
+    protected JobSchedulerStoreImpl createScheduler() {
+        JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl();
+        scheduler.setDirectory(schedulerStoreDir);
+        scheduler.setJournalMaxFileLength(10 * 1024);
+        return scheduler;
+    }
+
+    protected void createBroker() throws Exception {
+        createBroker(createScheduler());
+    }
+
+    protected void createBroker(JobSchedulerStoreImpl scheduler) throws Exception {
+        broker = new BrokerService();
+        broker.setJobSchedulerStore(scheduler);
+        broker.setPersistent(true);
+        broker.setDataDirectory(storeDir.getAbsolutePath());
+        broker.setSchedulerSupport(true);
+        broker.setUseJmx(false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java
new file mode 100644
index 0000000..721f417
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.ProtectionDomain;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SchedulerDBVersionTest {
+    static String basedir;
+    static {
+        try {
+            ProtectionDomain protectionDomain = SchedulerDBVersionTest.class.getProtectionDomain();
+            basedir = new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalPath();
+        } catch (IOException e) {
+            basedir = ".";
+        }
+    }
+
+    static final Logger LOG = LoggerFactory.getLogger(SchedulerDBVersionTest.class);
+    final static File VERSION_LEGACY_JMS =
+        new File(basedir + "/src/test/resources/org/apache/activemq/store/schedulerDB/legacy");
+
+    BrokerService broker = null;
+
+    protected BrokerService createBroker(JobSchedulerStoreImpl scheduler) throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setJobSchedulerStore(scheduler);
+        answer.setPersistent(true);
+        answer.setDataDirectory("target");
+        answer.setSchedulerSupport(true);
+        answer.setUseJmx(false);
+        return answer;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    @Ignore("Used only when a new version of the store needs to archive it's test data.")
+    @Test
+    public void testCreateStore() throws Exception {
+        JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl();
+        File dir = new File("src/test/resources/org/apache/activemq/store/schedulerDB/legacy");
+        IOHelper.deleteFile(dir);
+        scheduler.setDirectory(dir);
+        scheduler.setJournalMaxFileLength(1024 * 1024);
+        broker = createBroker(scheduler);
+        broker.start();
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = cf.createConnection();
+        connection.start();
+        scheduleRepeating(connection);
+        connection.close();
+        broker.stop();
+    }
+
+    private void scheduleRepeating(Connection connection) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test.queue");
+        MessageProducer producer = session.createProducer(queue);
+
+        TextMessage message = session.createTextMessage("test msg");
+        long time = 1000;
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500);
+        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1);
+        producer.send(message);
+        producer.close();
+    }
+
+    @Test
+    public void testLegacyStoreConversion() throws Exception {
+        doTestScheduleRepeated(VERSION_LEGACY_JMS);
+    }
+
+    public void doTestScheduleRepeated(File existingStore) throws Exception {
+        File testDir = new File("target/activemq-data/store/scheduler/versionDB");
+        IOHelper.deleteFile(testDir);
+        IOHelper.copyFile(existingStore, testDir);
+
+        final int NUMBER = 10;
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+
+        for (int i = 0; i < 3; ++i) {
+            JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl();
+            scheduler.setDirectory(testDir);
+            scheduler.setJournalMaxFileLength(1024 * 1024);
+            BrokerService broker = createBroker(scheduler);
+            broker.start();
+            broker.waitUntilStarted();
+
+            final AtomicInteger count = new AtomicInteger();
+            Connection connection = cf.createConnection();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("test.queue");
+
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            final CountDownLatch latch = new CountDownLatch(NUMBER);
+            consumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    LOG.info("Received scheduled message: {}", message);
+                    latch.countDown();
+                    count.incrementAndGet();
+                }
+            });
+
+            connection.start();
+            assertEquals(latch.getCount(), NUMBER);
+            latch.await(30, TimeUnit.SECONDS);
+
+            connection.close();
+            broker.stop();
+            broker.waitUntilStopped();
+
+            assertEquals(0, latch.getCount());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-unit-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/log4j.properties b/activemq-unit-tests/src/test/resources/log4j.properties
index 564ed9e..85516aa 100755
--- a/activemq-unit-tests/src/test/resources/log4j.properties
+++ b/activemq-unit-tests/src/test/resources/log4j.properties
@@ -21,6 +21,7 @@
 log4j.rootLogger=INFO, out, stdout
 
 #log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
+#log4j.logger.org.apache.activemq.store.kahadb.scheduler=DEBUG
 #log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
 #log4j.logger.org.apache.activemq.transport.failover=TRACE
 #log4j.logger.org.apache.activemq.store.jdbc=TRACE

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/db-1.log
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/db-1.log b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/db-1.log
new file mode 100644
index 0000000..342f8c7
Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/db-1.log differ

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data
new file mode 100644
index 0000000..30c937d
Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data differ

http://git-wip-us.apache.org/repos/asf/activemq/blob/74846bb2/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo
new file mode 100644
index 0000000..b06e549
Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo differ


Mime
View raw message