activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject [1/5] activemq git commit: Revert "https://issues.apache.org/jira/browse/AMQ-3758"
Date Thu, 25 Dec 2014 04:02:23 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.10.x 76357bdb1 -> 3424e04fa


http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyStoreReplayer.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyStoreReplayer.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyStoreReplayer.java
deleted file mode 100644
index 92563f4..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyStoreReplayer.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.scheduler.legacy;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand;
-import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to upgrade a Legacy Job Scheduler store to the latest version this class
- * loads a found legacy scheduler store and generates new add commands for all
- * jobs currently in the store.
- */
-public class LegacyStoreReplayer {
-
-    static final Logger LOG = LoggerFactory.getLogger(LegacyStoreReplayer.class);
-
-    private LegacyJobSchedulerStoreImpl store;
-    private final File legacyStoreDirectory;
-
-    /**
-     * Creates a new Legacy Store Replayer with the given target store
-     * @param targetStore
-     * @param directory
-     */
-    public LegacyStoreReplayer(File directory) {
-        this.legacyStoreDirectory = directory;
-    }
-
-    /**
-     * Loads the legacy store and prepares it for replay into a newer Store instance.
-     *
-     * @throws IOException if an error occurs while reading in the legacy store.
-     */
-    public void load() throws IOException {
-
-        store = new LegacyJobSchedulerStoreImpl();
-        store.setDirectory(legacyStoreDirectory);
-        store.setFailIfDatabaseIsLocked(true);
-
-        try {
-            store.start();
-        } catch (IOException ioe) {
-            LOG.warn("Legacy store load failed: ", ioe);
-            throw ioe;
-        } catch (Exception e) {
-            LOG.warn("Legacy store load failed: ", e);
-            throw new IOException(e);
-        }
-    }
-
-    /**
-     * Unloads a previously loaded legacy store to release any resources associated with it.
-     *
-     * Once a store is unloaded it cannot be replayed again until it has been reloaded.
-     * @throws IOException
-     */
-    public void unload() throws IOException {
-
-        if (store != null) {
-            try {
-                store.stop();
-            } catch (Exception e) {
-                LOG.warn("Legacy store unload failed: ", e);
-                throw new IOException(e);
-            } finally {
-                store = null;
-            }
-        }
-    }
-
-    /**
-     * Performs a replay of scheduled jobs into the target JobSchedulerStore.
-     *
-     * @param targetStore
-     *      The JobSchedulerStore that will receive the replay events from the legacy store.
-     *
-     * @throws IOException if an error occurs during replay of the legacy store.
-     */
-    public void startReplay(JobSchedulerStoreImpl targetStore) throws IOException {
-        checkLoaded();
-
-        if (targetStore == null) {
-            throw new IOException("Cannot replay to a null store");
-        }
-
-        try {
-            Set<String> schedulers = store.getJobSchedulerNames();
-            if (!schedulers.isEmpty()) {
-
-                for (String name : schedulers) {
-                    LegacyJobSchedulerImpl scheduler = store.getJobScheduler(name);
-                    LOG.info("Replay of legacy store {} starting.", name);
-                    replayScheduler(scheduler, targetStore);
-                }
-            }
-
-            LOG.info("Replay of legacy store complate.");
-        } catch (IOException ioe) {
-            LOG.warn("Failed during replay of legacy store: ", ioe);
-            throw ioe;
-        } catch (Exception e) {
-            LOG.warn("Failed during replay of legacy store: ", e);
-            throw new IOException(e);
-        }
-    }
-
-    private final void replayScheduler(LegacyJobSchedulerImpl legacy, JobSchedulerStoreImpl target) throws Exception {
-        List<LegacyJobImpl> jobs = legacy.getAllJobs();
-
-        String schedulerName = legacy.getName();
-
-        for (LegacyJobImpl job : jobs) {
-            LOG.trace("Storing job from legacy store to new store: {}", job);
-            KahaAddScheduledJobCommand newJob = new KahaAddScheduledJobCommand();
-            newJob.setScheduler(schedulerName);
-            newJob.setJobId(job.getJobId());
-            newJob.setStartTime(job.getStartTime());
-            newJob.setCronEntry(job.getCronEntry());
-            newJob.setDelay(job.getDelay());
-            newJob.setPeriod(job.getPeriod());
-            newJob.setRepeat(job.getRepeat());
-            newJob.setNextExecutionTime(job.getNextExecutionTime());
-            newJob.setPayload(job.getPayload());
-
-            target.store(newJob);
-        }
-    }
-
-    private final void checkLoaded() throws IOException {
-        if (this.store == null) {
-            throw new IOException("Cannot replay until legacy store is loaded.");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/proto/journal-data.proto
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/proto/journal-data.proto b/activemq-kahadb-store/src/main/proto/journal-data.proto
index 01607a5..8290c4c 100644
--- a/activemq-kahadb-store/src/main/proto/journal-data.proto
+++ b/activemq-kahadb-store/src/main/proto/journal-data.proto
@@ -32,11 +32,6 @@ enum KahaEntryType {
   KAHA_PRODUCER_AUDIT_COMMAND = 8;
   KAHA_ACK_MESSAGE_FILE_MAP_COMMAND = 9;
   KAHA_UPDATE_MESSAGE_COMMAND = 10;
-  KAHA_ADD_SCHEDULED_JOB_COMMAND = 11;
-  KAHA_RESCHEDULE_JOB_COMMAND = 12;
-  KAHA_REMOVE_SCHEDULED_JOB_COMMAND = 13;
-  KAHA_REMOVE_SCHEDULED_JOBS_COMMAND = 14;
-  KAHA_DESTROY_SCHEDULER_COMMAND = 15;
 }
 
 message KahaTraceCommand {
@@ -184,62 +179,6 @@ message KahaLocation {
   required int32 offset = 2;
 }
 
-message KahaAddScheduledJobCommand {
-  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaAddScheduledJobCommand>";
-  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
-  //| option java_type_method = "KahaEntryType";
-
-  required string scheduler=1;
-  required string job_id=2;
-  required int64 start_time=3;
-  required string cron_entry=4;
-  required int64 delay=5;
-  required int64 period=6;
-  required int32 repeat=7;
-  required bytes payload=8;
-  required int64 next_execution_time=9;
-}
-
-message KahaRescheduleJobCommand {
-  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRescheduleJobCommand>";
-  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
-  //| option java_type_method = "KahaEntryType";
-
-  required string scheduler=1;
-  required string job_id=2;
-  required int64 execution_time=3;
-  required int64 next_execution_time=4;
-  required int32 rescheduled_count=5;
-}
-
-message KahaRemoveScheduledJobCommand {
-  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRemoveScheduledJobCommand>";
-  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
-  //| option java_type_method = "KahaEntryType";
-
-  required string scheduler=1;
-  required string job_id=2;
-  required int64 next_execution_time=3;
-}
-
-message KahaRemoveScheduledJobsCommand {
-  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRemoveScheduledJobsCommand>";
-  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
-  //| option java_type_method = "KahaEntryType";
-
-  required string scheduler=1;
-  required int64 start_time=2;
-  required int64 end_time=3;
-}
-
-message KahaDestroySchedulerCommand {
-  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaDestroySchedulerCommand>";
-  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
-  //| option java_type_method = "KahaEntryType";
-
-  required string scheduler=1;
-}
-
 // TODO things to ponder
 // should we move more message fields
 // that are set by the sender (and rarely required by the broker

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 01d5170..42b25c6 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -35,7 +35,6 @@ import org.apache.activemq.leveldb.util.Log
 import org.apache.activemq.store.PList.PListIterator
 import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream}
 import org.fusesource.hawtdispatch;
-import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 
 object LevelDBStore extends Log {
   val DEFAULT_DIRECTORY = new File("LevelDB");
@@ -605,10 +604,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
     rc
   }
 
-  def createJobSchedulerStore():JobSchedulerStore = {
-    throw new UnsupportedOperationException();
-  }
-
   def removeTopicMessageStore(destination: ActiveMQTopic): Unit = {
     topics.remove(destination).foreach { store=>
       store.subscriptions.values.foreach { sub =>

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
index efd55f3..bd3904f 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
@@ -25,7 +25,6 @@ import java.io.File
 import java.io.IOException
 import java.util.Set
 import org.apache.activemq.util.{ServiceStopper, ServiceSupport}
-import org.apache.activemq.broker.scheduler.JobSchedulerStore
 
 /**
  */
@@ -45,10 +44,6 @@ abstract class ProxyLevelDBStore extends LockableServiceSupport with BrokerServi
     return proxy_target.createTopicMessageStore(destination)
   }
 
-  def createJobSchedulerStore():JobSchedulerStore = {
-    return proxy_target.createJobSchedulerStore()
-  }
-
   def setDirectory(dir: File) {
     proxy_target.setDirectory(dir)
   }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java
index 641ee97..04277bf 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java
@@ -39,7 +39,6 @@ public class JobSchedulerBrokerShutdownTest extends EmbeddedBrokerTestSupport {
 
         BrokerService broker = super.createBroker();
         broker.setSchedulerSupport(true);
-        broker.setDataDirectory("target");
         broker.setSchedulerDirectoryFile(schedulerDirectory);
         broker.getSystemUsage().getStoreUsage().setLimit(1 * 512);
         broker.deleteAllMessages();

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java
deleted file mode 100644
index 8adb980..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.broker.scheduler;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.management.openmbean.TabularData;
-
-import org.apache.activemq.ScheduledMessage;
-import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
-import org.apache.activemq.util.Wait;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Tests of the JMX JobSchedulerStore management MBean.
- */
-public class JobSchedulerJmxManagementTests extends JobSchedulerTestSupport {
-
-    private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerJmxManagementTests.class);
-
-    @Test
-    public void testJobSchedulerMBeanIsRegistered() throws Exception {
-        JobSchedulerViewMBean view = getJobSchedulerMBean();
-        assertNotNull(view);
-        assertTrue(view.getAllJobs().isEmpty());
-    }
-
-    @Test
-    public void testGetNumberOfJobs() throws Exception {
-        JobSchedulerViewMBean view = getJobSchedulerMBean();
-        assertNotNull(view);
-        assertTrue(view.getAllJobs().isEmpty());
-        scheduleMessage(60000, -1, -1);
-        assertFalse(view.getAllJobs().isEmpty());
-        assertEquals(1, view.getAllJobs().size());
-        scheduleMessage(60000, -1, -1);
-        assertEquals(2, view.getAllJobs().size());
-    }
-
-    @Test
-    public void testRemvoeJob() throws Exception {
-        JobSchedulerViewMBean view = getJobSchedulerMBean();
-        assertNotNull(view);
-        assertTrue(view.getAllJobs().isEmpty());
-        scheduleMessage(60000, -1, -1);
-        assertFalse(view.getAllJobs().isEmpty());
-        TabularData jobs = view.getAllJobs();
-        assertEquals(1, jobs.size());
-        for (Object key : jobs.keySet()) {
-            String jobId = ((List<?>)key).get(0).toString();
-            LOG.info("Attempting to remove Job: {}", jobId);
-            view.removeJob(jobId);
-        }
-        assertTrue(view.getAllJobs().isEmpty());
-    }
-
-    @Test
-    public void testRemvoeJobInRange() throws Exception {
-        JobSchedulerViewMBean view = getJobSchedulerMBean();
-        assertNotNull(view);
-        assertTrue(view.getAllJobs().isEmpty());
-        scheduleMessage(60000, -1, -1);
-        assertFalse(view.getAllJobs().isEmpty());
-        String now = JobSupport.getDateTime(System.currentTimeMillis());
-        String later = JobSupport.getDateTime(System.currentTimeMillis() + 120 * 1000);
-        view.removeAllJobs(now, later);
-        assertTrue(view.getAllJobs().isEmpty());
-    }
-
-    @Test
-    public void testGetNextScheduledJob() throws Exception {
-        JobSchedulerViewMBean view = getJobSchedulerMBean();
-        assertNotNull(view);
-        assertTrue(view.getAllJobs().isEmpty());
-        scheduleMessage(60000, -1, -1);
-        assertFalse(view.getAllJobs().isEmpty());
-        long before = System.currentTimeMillis() + 57 * 1000;
-        long toLate = System.currentTimeMillis() + 63 * 1000;
-        String next = view.getNextScheduleTime();
-        long nextTime = JobSupport.getDataTime(next);
-        LOG.info("Next Scheduled Time: {}", next);
-        assertTrue(nextTime > before);
-        assertTrue(nextTime < toLate);
-    }
-
-    @Test
-    public void testGetExecutionCount() throws Exception {
-        final JobSchedulerViewMBean view = getJobSchedulerMBean();
-        assertNotNull(view);
-        assertTrue(view.getAllJobs().isEmpty());
-        scheduleMessage(10000, 1000, 10);
-        assertFalse(view.getAllJobs().isEmpty());
-        TabularData jobs = view.getAllJobs();
-        assertEquals(1, jobs.size());
-        String jobId = null;
-        for (Object key : jobs.keySet()) {
-            jobId = ((List<?>)key).get(0).toString();
-        }
-
-        final String fixedJobId = jobId;
-        LOG.info("Attempting to get execution count for Job: {}", jobId);
-        assertEquals(0, view.getExecutionCount(jobId));
-
-        assertTrue("Should execute again", Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return view.getExecutionCount(fixedJobId) > 0;
-            }
-        }));
-    }
-
-    @Override
-    protected boolean isUseJmx() {
-        return true;
-    }
-
-    protected void scheduleMessage(int time, int period, int repeat) throws Exception {
-        Connection connection = createConnection();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(destination);
-        TextMessage message = session.createTextMessage("test msg");
-        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
-        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
-        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
-        producer.send(message);
-        connection.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
index c82f8ef..bc89d9e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
@@ -16,11 +16,7 @@
  */
 package org.apache.activemq.broker.scheduler;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
+import java.io.File;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -33,17 +29,18 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.IdGenerator;
-import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
+public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
 
     private static final transient Logger LOG = LoggerFactory.getLogger(JobSchedulerManagementTest.class);
 
-    @Test
     public void testRemoveAllScheduled() throws Exception {
         final int COUNT = 5;
         Connection connection = createConnection();
@@ -80,7 +77,6 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
         assertEquals(latch.getCount(), COUNT);
     }
 
-    @Test
     public void testRemoveAllScheduledAtTime() throws Exception {
         final int COUNT = 3;
         Connection connection = createConnection();
@@ -126,7 +122,8 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
         // Send the remove request
         MessageProducer producer = session.createProducer(management);
         Message request = session.createMessage();
-        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
+                                  ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
         request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));
         request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));
         producer.send(request);
@@ -146,7 +143,6 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
         assertEquals(2, latch.getCount());
     }
 
-    @Test
     public void testBrowseAllScheduled() throws Exception {
         final int COUNT = 10;
         Connection connection = createConnection();
@@ -195,8 +191,7 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
         Thread.sleep(2000);
         assertEquals(latch.getCount(), COUNT);
 
-        // now see if we got all the scheduled messages on the browse
-        // destination.
+        // now see if we got all the scheduled messages on the browse destination.
         latch.await(10, TimeUnit.SECONDS);
         assertEquals(browsedLatch.getCount(), 0);
 
@@ -205,7 +200,6 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
         assertEquals(latch.getCount(), 0);
     }
 
-    @Test
     public void testBrowseWindowlScheduled() throws Exception {
         final int COUNT = 10;
         Connection connection = createConnection();
@@ -261,18 +255,15 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
         Thread.sleep(2000);
         assertEquals(COUNT + 2, latch.getCount());
 
-        // now see if we got all the scheduled messages on the browse
-        // destination.
+        // now see if we got all the scheduled messages on the browse destination.
         latch.await(15, TimeUnit.SECONDS);
         assertEquals(0, browsedLatch.getCount());
 
-        // now see if we got all the scheduled messages on the browse
-        // destination.
+        // now see if we got all the scheduled messages on the browse destination.
         latch.await(20, TimeUnit.SECONDS);
         assertEquals(0, latch.getCount());
     }
 
-    @Test
     public void testRemoveScheduled() throws Exception {
         final int COUNT = 10;
         Connection connection = createConnection();
@@ -306,7 +297,8 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
 
         // Send the browse request
         Message request = session.createMessage();
-        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
+        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
+                                  ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
         request.setJMSReplyTo(browseDest);
         producer.send(request);
 
@@ -315,12 +307,14 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
             Message message = browser.receive(2000);
             assertNotNull(message);
 
-            try {
+            try{
                 Message remove = session.createMessage();
-                remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
-                remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, message.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID));
+                remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
+                        ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
+                remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID,
+                        message.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID));
                 producer.send(remove);
-            } catch (Exception e) {
+            } catch(Exception e) {
             }
         }
 
@@ -329,7 +323,6 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
         assertEquals(COUNT, latch.getCount());
     }
 
-    @Test
     public void testRemoveNotScheduled() throws Exception {
         Connection connection = createConnection();
 
@@ -340,19 +333,19 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
 
         MessageProducer producer = session.createProducer(management);
 
-        try {
+        try{
 
             // Send the remove request
             Message remove = session.createMessage();
-            remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
+            remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
+                    ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
             remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, new IdGenerator().generateId());
             producer.send(remove);
-        } catch (Exception e) {
+        } catch(Exception e) {
             fail("Caught unexpected exception during remove of unscheduled message.");
         }
     }
 
-    @Test
     public void testBrowseWithSelector() throws Exception {
         Connection connection = createConnection();
 
@@ -369,7 +362,7 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
         Destination browseDest = session.createTemporaryTopic();
 
         // Create the "Browser"
-        MessageConsumer browser = session.createConsumer(browseDest, ScheduledMessage.AMQ_SCHEDULED_DELAY + " = 45000");
+        MessageConsumer browser = session.createConsumer(browseDest, ScheduledMessage.AMQ_SCHEDULED_DELAY + " = 45000" );
 
         connection.start();
 
@@ -390,6 +383,7 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
         assertNull(message);
     }
 
+
     protected void scheduleMessage(Connection connection, long delay) throws Exception {
         scheduleMessage(connection, delay, 1);
     }
@@ -400,10 +394,38 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
         TextMessage message = session.createTextMessage("test msg");
         message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
 
-        for (int i = 0; i < count; ++i) {
+        for(int i = 0; i < count; ++i ) {
             producer.send(message);
         }
 
         producer.close();
     }
+
+    @Override
+    protected void setUp() throws Exception {
+        bindAddress = "vm://localhost";
+        super.setUp();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        return createBroker(true);
+    }
+
+    protected BrokerService createBroker(boolean delete) throws Exception {
+        File schedulerDirectory = new File("target/scheduler");
+        if (delete) {
+            IOHelper.mkdirs(schedulerDirectory);
+            IOHelper.deleteChildren(schedulerDirectory);
+        }
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(true);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.setDataDirectory("target");
+        answer.setSchedulerDirectoryFile(schedulerDirectory);
+        answer.setSchedulerSupport(true);
+        answer.setUseJmx(false);
+        answer.addConnector(bindAddress);
+        return answer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
deleted file mode 100644
index c013a4c..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.broker.scheduler;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JobSchedulerStoreCheckpointTest {
-
-    static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreCheckpointTest.class);
-
-    private JobSchedulerStoreImpl store;
-    private JobScheduler scheduler;
-    private ByteSequence payload;
-
-    @Before
-    public void setUp() throws Exception {
-        File directory = new File("target/test/ScheduledJobsDB");
-        IOHelper.mkdirs(directory);
-        IOHelper.deleteChildren(directory);
-        startStore(directory);
-
-        byte[] data = new byte[8192];
-        for (int i = 0; i < data.length; ++i) {
-            data[i] = (byte) (i % 256);
-        }
-
-        payload = new ByteSequence(data);
-    }
-
-    protected void startStore(File directory) throws Exception {
-        store = new JobSchedulerStoreImpl();
-        store.setDirectory(directory);
-        store.setCheckpointInterval(5000);
-        store.setCleanupInterval(10000);
-        store.setJournalMaxFileLength(10 * 1024);
-        store.start();
-        scheduler = store.getJobScheduler("test");
-        scheduler.startDispatching();
-    }
-
-    private int getNumJournalFiles() throws IOException {
-        return store.getJournal().getFileMap().size();
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        scheduler.stopDispatching();
-        store.stop();
-    }
-
-    @Test
-    public void test() throws Exception {
-        final int COUNT = 10;
-        final CountDownLatch latch = new CountDownLatch(COUNT);
-        scheduler.addListener(new JobListener() {
-            @Override
-            public void scheduledJob(String id, ByteSequence job) {
-                latch.countDown();
-            }
-        });
-
-        long time = TimeUnit.SECONDS.toMillis(30);
-        for (int i = 0; i < COUNT; i++) {
-            scheduler.schedule("id" + i, payload, "", time, 0, 0);
-        }
-
-        int size = scheduler.getAllJobs().size();
-        assertEquals(size, COUNT);
-
-        LOG.info("Number of journal log files: {}", getNumJournalFiles());
-        // need a little slack so go over 60 seconds
-        assertTrue(latch.await(70, TimeUnit.SECONDS));
-        assertEquals(0, latch.getCount());
-
-        for (int i = 0; i < COUNT; i++) {
-            scheduler.schedule("id" + i, payload, "", time, 0, 0);
-        }
-
-        LOG.info("Number of journal log files: {}", getNumJournalFiles());
-        // need a little slack so go over 60 seconds
-        assertTrue(latch.await(70, TimeUnit.SECONDS));
-        assertEquals(0, latch.getCount());
-
-        assertTrue("Should be only one log left: " + getNumJournalFiles(), Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return getNumJournalFiles() == 1;
-            }
-        }, TimeUnit.MINUTES.toMillis(2)));
-
-        LOG.info("Number of journal log files: {}", getNumJournalFiles());
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
index df1e7ff..0e0c1d7 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
@@ -16,62 +16,50 @@
  */
 package org.apache.activemq.broker.scheduler;
 
-import static org.junit.Assert.assertEquals;
-
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 
+import junit.framework.TestCase;
+
 import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOHelper;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JobSchedulerStoreTest {
 
-    private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreTest.class);
+public class JobSchedulerStoreTest extends TestCase {
 
-    @Test(timeout = 120 * 1000)
     public void testRestart() throws Exception {
         JobSchedulerStore store = new JobSchedulerStoreImpl();
         File directory = new File("target/test/ScheduledDB");
-        IOHelper.mkdirs(directory);
-        IOHelper.deleteChildren(directory);
-        store.setDirectory(directory);
+          IOHelper.mkdirs(directory);
+          IOHelper.deleteChildren(directory);
+          store.setDirectory(directory);
         final int NUMBER = 1000;
         store.start();
-        List<ByteSequence> list = new ArrayList<ByteSequence>();
-        for (int i = 0; i < NUMBER; i++) {
-            ByteSequence buff = new ByteSequence(new String("testjob" + i).getBytes());
+        List<ByteSequence>list = new ArrayList<ByteSequence>();
+        for (int i = 0; i < NUMBER;i++ ) {
+            ByteSequence buff = new ByteSequence(new String("testjob"+i).getBytes());
             list.add(buff);
         }
-
         JobScheduler js = store.getJobScheduler("test");
         js.startDispatching();
         int count = 0;
-        long startTime = 10 * 60 * 1000;
-        long period = startTime;
-        for (ByteSequence job : list) {
-            js.schedule("id:" + (count++), job, "", startTime, period, -1);
+        long startTime = 10 * 60 * 1000; long period = startTime;
+        for (ByteSequence job:list) {
+            js.schedule("id:"+(count++), job, "", startTime, period, -1);
         }
-
-        List<Job> test = js.getAllJobs();
-        LOG.debug("Found {} jobs in the store before restart", test.size());
-        assertEquals(list.size(), test.size());
+        List<Job>test = js.getAllJobs();
+        assertEquals(list.size(),test.size());
         store.stop();
 
         store.start();
         js = store.getJobScheduler("test");
         test = js.getAllJobs();
-        LOG.debug("Found {} jobs in the store after restart", test.size());
-        assertEquals(list.size(), test.size());
-
-        for (int i = 0; i < list.size(); i++) {
+        assertEquals(list.size(),test.size());
+        for (int i = 0; i < list.size();i++) {
             String orig = new String(list.get(i).getData());
             String payload = new String(test.get(i).getPayload());
-            assertEquals(orig, payload);
+            assertEquals(orig,payload);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
index 2210eba..5126970 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
@@ -31,13 +31,8 @@ import org.apache.activemq.util.IOHelper;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class JobSchedulerTest {
-
-    private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerTest.class);
-
     private JobSchedulerStore store;
     private JobScheduler scheduler;
 
@@ -178,37 +173,6 @@ public class JobSchedulerTest {
     }
 
     @Test
-    public void testGetExecutionCount() throws Exception {
-        final String jobId = "Job-1";
-        long time = 10000;
-        final CountDownLatch done = new CountDownLatch(10);
-
-        String str = new String("test");
-        scheduler.schedule(jobId, new ByteSequence(str.getBytes()), "", time, 1000, 10);
-
-        int size = scheduler.getAllJobs().size();
-        assertEquals(size, 1);
-
-        scheduler.addListener(new JobListener() {
-            @Override
-            public void scheduledJob(String id, ByteSequence job) {
-                LOG.info("Job exectued: {}", 11 - done.getCount());
-                done.countDown();
-            }
-        });
-
-        List<Job> jobs = scheduler.getNextScheduleJobs();
-        assertEquals(1, jobs.size());
-        Job job = jobs.get(0);
-        assertEquals(jobId, job.getJobId());
-        assertEquals(0, job.getExecutionCount());
-        assertTrue("Should have fired ten times.", done.await(60, TimeUnit.SECONDS));
-        // The job is not updated on the last firing as it is removed from the store following
-        // it's last execution so the count will always be one less than the max firings.
-        assertTrue(job.getExecutionCount() >= 9);
-    }
-
-    @Test
     public void testgetAllJobs() throws Exception {
         final int COUNT = 10;
         final String ID = "id:";

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
deleted file mode 100644
index 2b25797..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.broker.scheduler;
-
-import java.io.File;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Queue;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.util.IOHelper;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.rules.TestName;
-
-/**
- * Base class for tests of the Broker's JobSchedulerStore.
- */
-public class JobSchedulerTestSupport {
-
-    @Rule public TestName name = new TestName();
-
-    protected String connectionUri;
-    protected BrokerService broker;
-    protected JobScheduler jobScheduler;
-    protected Queue destination;
-
-    @Before
-    public void setUp() throws Exception {
-        connectionUri = "vm://localhost";
-        destination = new ActiveMQQueue(name.getMethodName());
-
-        broker = createBroker();
-        broker.start();
-        broker.waitUntilStarted();
-
-        jobScheduler = broker.getJobSchedulerStore().getJobScheduler("JMS");
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        if (broker != null) {
-            broker.stop();
-            broker.waitUntilStopped();
-        }
-    }
-
-    protected Connection createConnection() throws Exception {
-        return createConnectionFactory().createConnection();
-    }
-
-    protected ConnectionFactory createConnectionFactory() throws Exception {
-        return new ActiveMQConnectionFactory(connectionUri);
-    }
-
-    protected BrokerService createBroker() throws Exception {
-        return createBroker(true);
-    }
-
-    protected boolean isUseJmx() {
-        return false;
-    }
-
-    protected JobSchedulerViewMBean getJobSchedulerMBean() throws Exception {
-        ObjectName objectName = broker.getAdminView().getJMSJobScheduler();
-        JobSchedulerViewMBean scheduler = null;
-        if (objectName != null) {
-            scheduler = (JobSchedulerViewMBean) broker.getManagementContext()
-                .newProxyInstance(objectName, JobSchedulerViewMBean.class, true);
-        }
-
-        return scheduler;
-    }
-
-    protected BrokerService createBroker(boolean delete) throws Exception {
-        File schedulerDirectory = new File("target/scheduler");
-        if (delete) {
-            IOHelper.mkdirs(schedulerDirectory);
-            IOHelper.deleteChildren(schedulerDirectory);
-        }
-
-        BrokerService answer = new BrokerService();
-        answer.setPersistent(true);
-        answer.setDeleteAllMessagesOnStartup(true);
-        answer.setDataDirectory("target");
-        answer.setSchedulerDirectoryFile(schedulerDirectory);
-        answer.setSchedulerSupport(true);
-        answer.setUseJmx(isUseJmx());
-        return answer;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java
deleted file mode 100644
index 9d0fef2..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerIndexRebuildTest.java
+++ /dev/null
@@ -1,179 +0,0 @@
-package org.apache.activemq.broker.scheduler;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.security.ProtectionDomain;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ScheduledMessage;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KahaDBSchedulerIndexRebuildTest {
-
-    static final Logger LOG = LoggerFactory.getLogger(KahaDBSchedulerIndexRebuildTest.class);
-
-    private BrokerService broker = null;
-    private final int NUM_JOBS = 50;
-
-    static String basedir;
-    static {
-        try {
-            ProtectionDomain protectionDomain = SchedulerDBVersionTest.class.getProtectionDomain();
-            basedir = new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../.").getCanonicalPath();
-        } catch (IOException e) {
-            basedir = ".";
-        }
-    }
-
-    private final File schedulerStoreDir = new File(basedir, "activemq-data/store/scheduler");
-    private final File storeDir = new File(basedir, "activemq-data/store/");
-
-    @Before
-    public void setUp() throws Exception {
-        LOG.info("Test Dir = {}", schedulerStoreDir);
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        if (broker != null) {
-            broker.stop();
-        }
-    }
-
-    @Test
-    public void testIndexRebuilds() throws Exception {
-        IOHelper.deleteFile(schedulerStoreDir);
-
-        JobSchedulerStoreImpl schedulerStore = createScheduler();
-        broker = createBroker(schedulerStore);
-        broker.start();
-        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
-        Connection connection = cf.createConnection();
-        connection.start();
-        for (int i = 0; i < NUM_JOBS; ++i) {
-            scheduleRepeating(connection);
-        }
-        connection.close();
-
-        JobScheduler scheduler = schedulerStore.getJobScheduler("JMS");
-        assertNotNull(scheduler);
-        assertEquals(NUM_JOBS, scheduler.getAllJobs().size());
-
-        broker.stop();
-
-        IOHelper.delete(new File(schedulerStoreDir, "scheduleDB.data"));
-
-        schedulerStore = createScheduler();
-        broker = createBroker(schedulerStore);
-        broker.start();
-
-        scheduler = schedulerStore.getJobScheduler("JMS");
-        assertNotNull(scheduler);
-        assertEquals(NUM_JOBS, scheduler.getAllJobs().size());
-    }
-
-    @Test
-    public void testIndexRebuildsAfterSomeJobsExpire() throws Exception {
-        IOHelper.deleteFile(schedulerStoreDir);
-
-        JobSchedulerStoreImpl schedulerStore = createScheduler();
-        broker = createBroker(schedulerStore);
-        broker.start();
-        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
-        Connection connection = cf.createConnection();
-        connection.start();
-        for (int i = 0; i < NUM_JOBS; ++i) {
-            scheduleRepeating(connection);
-            scheduleOneShot(connection);
-        }
-        connection.close();
-
-        JobScheduler scheduler = schedulerStore.getJobScheduler("JMS");
-        assertNotNull(scheduler);
-        assertEquals(NUM_JOBS * 2, scheduler.getAllJobs().size());
-
-        final JobScheduler awaitingOneShotTimeout = scheduler;
-        assertTrue("One shot jobs should time out", Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return awaitingOneShotTimeout.getAllJobs().size() == NUM_JOBS;
-            }
-        }, TimeUnit.MINUTES.toMillis(2)));
-
-        broker.stop();
-
-        IOHelper.delete(new File(schedulerStoreDir, "scheduleDB.data"));
-
-        schedulerStore = createScheduler();
-        broker = createBroker(schedulerStore);
-        broker.start();
-
-        scheduler = schedulerStore.getJobScheduler("JMS");
-        assertNotNull(scheduler);
-        assertEquals(NUM_JOBS, scheduler.getAllJobs().size());
-    }
-
-    private void scheduleRepeating(Connection connection) throws Exception {
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue("test.queue");
-        MessageProducer producer = session.createProducer(queue);
-
-        TextMessage message = session.createTextMessage("test msg");
-        long time = 360 * 1000;
-        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
-        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500);
-        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1);
-        producer.send(message);
-        producer.close();
-    }
-
-    private void scheduleOneShot(Connection connection) throws Exception {
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue("test.queue");
-        MessageProducer producer = session.createProducer(queue);
-
-        TextMessage message = session.createTextMessage("test msg");
-        long time = TimeUnit.SECONDS.toMillis(30);
-        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
-        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 0);
-        producer.send(message);
-        producer.close();
-    }
-
-    protected JobSchedulerStoreImpl createScheduler() {
-        JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl();
-        scheduler.setDirectory(schedulerStoreDir);
-        scheduler.setJournalMaxFileLength(10 * 1024);
-        return scheduler;
-    }
-
-    protected BrokerService createBroker(JobSchedulerStoreImpl scheduler) throws Exception {
-        BrokerService answer = new BrokerService();
-        answer.setJobSchedulerStore(scheduler);
-        answer.setPersistent(true);
-        answer.setDataDirectory(storeDir.getAbsolutePath());
-        answer.setSchedulerSupport(true);
-        answer.setUseJmx(false);
-        return answer;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java
deleted file mode 100644
index 30da10d..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/KahaDBSchedulerMissingJournalLogsTest.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.broker.scheduler;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.security.ProtectionDomain;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ScheduledMessage;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.store.kahadb.disk.journal.DataFile;
-import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
-import org.apache.activemq.util.IOHelper;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *Test that the store recovers even if some log files are missing.
- */
-public class KahaDBSchedulerMissingJournalLogsTest {
-
-    static final Logger LOG = LoggerFactory.getLogger(KahaDBSchedulerIndexRebuildTest.class);
-
-    private BrokerService broker = null;
-    private JobSchedulerStoreImpl schedulerStore = null;
-
-    private final int NUM_LOGS = 6;
-
-    static String basedir;
-    static {
-        try {
-            ProtectionDomain protectionDomain = SchedulerDBVersionTest.class.getProtectionDomain();
-            basedir = new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../.").getCanonicalPath();
-        } catch (IOException e) {
-            basedir = ".";
-        }
-    }
-
-    private final File schedulerStoreDir = new File(basedir, "activemq-data/store/scheduler");
-    private final File storeDir = new File(basedir, "activemq-data/store/");
-
-    /**
-     * @throws java.lang.Exception
-     */
-    @Before
-    public void setUp() throws Exception {
-        IOHelper.deleteFile(schedulerStoreDir);
-        LOG.info("Test Dir = {}", schedulerStoreDir);
-
-        createBroker();
-        broker.start();
-        broker.waitUntilStarted();
-
-        schedulerStore = (JobSchedulerStoreImpl) broker.getJobSchedulerStore();
-    }
-
-    /**
-     * @throws java.lang.Exception
-     */
-    @After
-    public void tearDown() throws Exception {
-        if (broker != null) {
-            broker.stop();
-            broker.waitUntilStopped();
-        }
-    }
-
-    @Test(timeout=120 * 1000)
-    public void testMissingLogsCausesBrokerToFail() throws Exception {
-        fillUpSomeLogFiles();
-
-        int jobCount = schedulerStore.getJobScheduler("JMS").getAllJobs().size();
-        LOG.info("There are {} jobs in the store.", jobCount);
-
-        List<File> toDelete = new ArrayList<File>();
-        Map<Integer, DataFile> files = schedulerStore.getJournal().getFileMap();
-        for (int i = files.size(); i > files.size() / 2; i--) {
-            toDelete.add(files.get(i).getFile());
-        }
-
-        broker.stop();
-        broker.waitUntilStopped();
-
-        for (File file : toDelete) {
-            LOG.info("File to delete: {}", file);
-            IOHelper.delete(file);
-        }
-
-        try {
-            createBroker();
-            fail("Should not start when logs are missing.");
-        } catch (Exception e) {
-        }
-    }
-
-    @Test(timeout=120 * 1000)
-    public void testRecoverWhenSomeLogsAreMissing() throws Exception {
-        fillUpSomeLogFiles();
-
-        int jobCount = schedulerStore.getJobScheduler("JMS").getAllJobs().size();
-        LOG.info("There are {} jobs in the store.", jobCount);
-
-        List<File> toDelete = new ArrayList<File>();
-        Map<Integer, DataFile> files = schedulerStore.getJournal().getFileMap();
-        for (int i = files.size() - 1; i > files.size() / 2; i--) {
-            toDelete.add(files.get(i).getFile());
-        }
-
-        broker.stop();
-        broker.waitUntilStopped();
-
-        for (File file : toDelete) {
-            LOG.info("File to delete: {}", file);
-            IOHelper.delete(file);
-        }
-
-        schedulerStore = createScheduler();
-        schedulerStore.setIgnoreMissingJournalfiles(true);
-
-        createBroker(schedulerStore);
-        broker.start();
-        broker.waitUntilStarted();
-
-        int postRecoverJobCount = schedulerStore.getJobScheduler("JMS").getAllJobs().size();
-        assertTrue(postRecoverJobCount > 0);
-        assertTrue(postRecoverJobCount < jobCount);
-    }
-
-    private void fillUpSomeLogFiles() throws Exception {
-        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
-        Connection connection = cf.createConnection();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue("test.queue");
-        MessageProducer producer = session.createProducer(queue);
-        connection.start();
-        while (true) {
-            scheduleRepeating(session, producer);
-            if (schedulerStore.getJournal().getFileMap().size() == NUM_LOGS) {
-                break;
-            }
-        }
-        connection.close();
-    }
-
-    private void scheduleRepeating(Session session, MessageProducer producer) throws Exception {
-        TextMessage message = session.createTextMessage("test msg");
-        long time = 360 * 1000;
-        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
-        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500);
-        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1);
-        producer.send(message);
-    }
-
-    protected JobSchedulerStoreImpl createScheduler() {
-        JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl();
-        scheduler.setDirectory(schedulerStoreDir);
-        scheduler.setJournalMaxFileLength(10 * 1024);
-        return scheduler;
-    }
-
-    protected void createBroker() throws Exception {
-        createBroker(createScheduler());
-    }
-
-    protected void createBroker(JobSchedulerStoreImpl scheduler) throws Exception {
-        broker = new BrokerService();
-        broker.setJobSchedulerStore(scheduler);
-        broker.setPersistent(true);
-        broker.setDataDirectory(storeDir.getAbsolutePath());
-        broker.setSchedulerSupport(true);
-        broker.setUseJmx(false);
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java
deleted file mode 100644
index 721f417..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/SchedulerDBVersionTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.scheduler;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.security.ProtectionDomain;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ScheduledMessage;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
-import org.apache.activemq.util.IOHelper;
-import org.junit.After;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SchedulerDBVersionTest {
-    static String basedir;
-    static {
-        try {
-            ProtectionDomain protectionDomain = SchedulerDBVersionTest.class.getProtectionDomain();
-            basedir = new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalPath();
-        } catch (IOException e) {
-            basedir = ".";
-        }
-    }
-
-    static final Logger LOG = LoggerFactory.getLogger(SchedulerDBVersionTest.class);
-    final static File VERSION_LEGACY_JMS =
-        new File(basedir + "/src/test/resources/org/apache/activemq/store/schedulerDB/legacy");
-
-    BrokerService broker = null;
-
-    protected BrokerService createBroker(JobSchedulerStoreImpl scheduler) throws Exception {
-        BrokerService answer = new BrokerService();
-        answer.setJobSchedulerStore(scheduler);
-        answer.setPersistent(true);
-        answer.setDataDirectory("target");
-        answer.setSchedulerSupport(true);
-        answer.setUseJmx(false);
-        return answer;
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        if (broker != null) {
-            broker.stop();
-        }
-    }
-
-    @Ignore("Used only when a new version of the store needs to archive it's test data.")
-    @Test
-    public void testCreateStore() throws Exception {
-        JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl();
-        File dir = new File("src/test/resources/org/apache/activemq/store/schedulerDB/legacy");
-        IOHelper.deleteFile(dir);
-        scheduler.setDirectory(dir);
-        scheduler.setJournalMaxFileLength(1024 * 1024);
-        broker = createBroker(scheduler);
-        broker.start();
-        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
-        Connection connection = cf.createConnection();
-        connection.start();
-        scheduleRepeating(connection);
-        connection.close();
-        broker.stop();
-    }
-
-    private void scheduleRepeating(Connection connection) throws Exception {
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue("test.queue");
-        MessageProducer producer = session.createProducer(queue);
-
-        TextMessage message = session.createTextMessage("test msg");
-        long time = 1000;
-        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
-        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500);
-        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1);
-        producer.send(message);
-        producer.close();
-    }
-
-    @Test
-    public void testLegacyStoreConversion() throws Exception {
-        doTestScheduleRepeated(VERSION_LEGACY_JMS);
-    }
-
-    public void doTestScheduleRepeated(File existingStore) throws Exception {
-        File testDir = new File("target/activemq-data/store/scheduler/versionDB");
-        IOHelper.deleteFile(testDir);
-        IOHelper.copyFile(existingStore, testDir);
-
-        final int NUMBER = 10;
-        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
-
-        for (int i = 0; i < 3; ++i) {
-            JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl();
-            scheduler.setDirectory(testDir);
-            scheduler.setJournalMaxFileLength(1024 * 1024);
-            BrokerService broker = createBroker(scheduler);
-            broker.start();
-            broker.waitUntilStarted();
-
-            final AtomicInteger count = new AtomicInteger();
-            Connection connection = cf.createConnection();
-
-            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Queue queue = session.createQueue("test.queue");
-
-            MessageConsumer consumer = session.createConsumer(queue);
-
-            final CountDownLatch latch = new CountDownLatch(NUMBER);
-            consumer.setMessageListener(new MessageListener() {
-                @Override
-                public void onMessage(Message message) {
-                    LOG.info("Received scheduled message: {}", message);
-                    latch.countDown();
-                    count.incrementAndGet();
-                }
-            });
-
-            connection.start();
-            assertEquals(latch.getCount(), NUMBER);
-            latch.await(30, TimeUnit.SECONDS);
-
-            connection.close();
-            broker.stop();
-            broker.waitUntilStopped();
-
-            assertEquals(0, latch.getCount());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/log4j.properties b/activemq-unit-tests/src/test/resources/log4j.properties
index 85516aa..564ed9e 100755
--- a/activemq-unit-tests/src/test/resources/log4j.properties
+++ b/activemq-unit-tests/src/test/resources/log4j.properties
@@ -21,7 +21,6 @@
 log4j.rootLogger=INFO, out, stdout
 
 #log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
-#log4j.logger.org.apache.activemq.store.kahadb.scheduler=DEBUG
 #log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
 #log4j.logger.org.apache.activemq.transport.failover=TRACE
 #log4j.logger.org.apache.activemq.store.jdbc=TRACE

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/db-1.log
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/db-1.log b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/db-1.log
deleted file mode 100644
index 342f8c7..0000000
Binary files a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/db-1.log and /dev/null differ

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data
deleted file mode 100644
index 30c937d..0000000
Binary files a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data and /dev/null differ

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo
deleted file mode 100644
index b06e549..0000000
Binary files a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo and /dev/null differ


Mime
View raw message