activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5271
Date Thu, 10 Jul 2014 16:17:09 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 2b53036b2 -> 433912f79


https://issues.apache.org/jira/browse/AMQ-5271

Add an in-memory variant of the disk based JobScheduler store to allow
for an embedded broker to have scheduler support without needing to use
the disk based version. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/433912f7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/433912f7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/433912f7

Branch: refs/heads/trunk
Commit: 433912f79a3ffb2323bc0060a980e77122f9a204
Parents: 2b53036
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu Jul 10 12:16:34 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Thu Jul 10 12:16:34 2014 -0400

----------------------------------------------------------------------
 .../apache/activemq/broker/BrokerService.java   |  12 +-
 .../broker/scheduler/memory/InMemoryJob.java    | 151 ++++++
 .../scheduler/memory/InMemoryJobScheduler.java  | 482 +++++++++++++++++++
 .../memory/InMemoryJobSchedulerStore.java       | 119 +++++
 .../broker/scheduler/JmsCronSchedulerTest.java  |  57 +--
 .../broker/scheduler/JmsSchedulerTest.java      |  46 +-
 .../JobSchedulerJmxManagementTests.java         |   2 +-
 .../broker/scheduler/JobSchedulerTest.java      |   5 +-
 .../scheduler/JobSchedulerTestSupport.java      |   6 +-
 .../broker/scheduler/JobSchedulerTxTest.java    |  52 +-
 .../memory/InMemeoryJmsSchedulerTest.java       |  40 ++
 .../memory/InMemoryJmsCronSchedulerTest.java    |  30 ++
 .../InMemoryJobSchedulerJmxManagementTests.java |  30 ++
 .../InMemoryJobSchedulerManagementTest.java     |  30 ++
 .../memory/InMemoryJobSchedulerStoreTest.java   |  74 +++
 .../memory/InMemoryJobSchedulerTest.java        |  36 ++
 .../memory/InMemoryJobSchedulerTxTest.java      |  30 ++
 17 files changed, 1061 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/433912f7/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 1ff4a21..50589b8 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -87,6 +87,7 @@ import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
 import org.apache.activemq.broker.region.virtual.VirtualTopic;
 import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.broker.scheduler.SchedulerBroker;
+import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.BrokerId;
@@ -1863,7 +1864,14 @@ public class BrokerService implements Service {
         if (jobSchedulerStore == null) {
 
             if (!isPersistent()) {
-                return null;
+                this.jobSchedulerStore = new InMemoryJobSchedulerStore();
+                configureService(jobSchedulerStore);
+                try {
+                    jobSchedulerStore.start();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                return this.jobSchedulerStore;
             }
 
             try {
@@ -2799,7 +2807,7 @@ public class BrokerService implements Service {
      * @return the schedulerSupport
      */
     public boolean isSchedulerSupport() {
-        return this.schedulerSupport && (isPersistent() || jobSchedulerStore != null);
+        return this.schedulerSupport;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/433912f7/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJob.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJob.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJob.java
new file mode 100644
index 0000000..9a4b012
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJob.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler.memory;
+
+import org.apache.activemq.broker.scheduler.Job;
+import org.apache.activemq.broker.scheduler.JobSupport;
+
+/**
+ * A simple in memory Job POJO.
+ */
+public class InMemoryJob implements Job {
+
+    private final String jobId;
+
+    private int repeat;
+    private long start;
+    private long nextTime;
+    private long delay;
+    private long period;
+    private String cronEntry;
+    private int executionCount;
+
+    private byte[] payload;
+
+    public InMemoryJob(String jobId) {
+        this.jobId = jobId;
+    }
+
+    @Override
+    public String getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public int getRepeat() {
+        return repeat;
+    }
+
+    public void setRepeat(int repeat) {
+        this.repeat = repeat;
+    }
+
+    @Override
+    public long getStart() {
+        return start;
+    }
+
+    public void setStart(long start) {
+        this.start = start;
+    }
+
+    public long getNextTime() {
+        return nextTime;
+    }
+
+    public void setNextTime(long nextTime) {
+        this.nextTime = nextTime;
+    }
+
+    @Override
+    public long getDelay() {
+        return delay;
+    }
+
+    public void setDelay(long delay) {
+        this.delay = delay;
+    }
+
+    @Override
+    public long getPeriod() {
+        return period;
+    }
+
+    public void setPeriod(long period) {
+        this.period = period;
+    }
+
+    @Override
+    public String getCronEntry() {
+        return cronEntry;
+    }
+
+    public void setCronEntry(String cronEntry) {
+        this.cronEntry = cronEntry;
+    }
+
+    @Override
+    public byte[] getPayload() {
+        return payload;
+    }
+
+    public void setPayload(byte[] payload) {
+        this.payload = payload;
+    }
+
+    @Override
+    public String getStartTime() {
+        return JobSupport.getDateTime(getStart());
+    }
+
+    @Override
+    public String getNextExecutionTime() {
+        return JobSupport.getDateTime(getNextTime());
+    }
+
+    @Override
+    public int getExecutionCount() {
+        return executionCount;
+    }
+
+    public void incrementExecutionCount() {
+        this.executionCount++;
+    }
+
+    public void decrementRepeatCount() {
+        if (this.repeat > 0) {
+            this.repeat--;
+        }
+    }
+
+    /**
+     * @return true if this Job represents a Cron entry.
+     */
+    public boolean isCron() {
+        return getCronEntry() != null && getCronEntry().length() > 0;
+    }
+
+    @Override
+    public int hashCode() {
+        return jobId.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "Job: " + getJobId();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/433912f7/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java
new file mode 100644
index 0000000..41be322
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java
@@ -0,0 +1,482 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.scheduler.memory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.jms.MessageFormatException;
+
+import org.apache.activemq.broker.scheduler.CronParser;
+import org.apache.activemq.broker.scheduler.Job;
+import org.apache.activemq.broker.scheduler.JobListener;
+import org.apache.activemq.broker.scheduler.JobScheduler;
+import org.apache.activemq.broker.scheduler.JobSupport;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IdGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements an in-memory JobScheduler instance.
+ */
+public class InMemoryJobScheduler implements JobScheduler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryJobScheduler.class);
+
+    private static final IdGenerator ID_GENERATOR = new IdGenerator();
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final String name;
+    private final TreeMap<Long, ScheduledTask> jobs = new TreeMap<Long, ScheduledTask>();
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicBoolean dispatchEnabled = new AtomicBoolean(false);
+    private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
+    private final Timer timer = new Timer();
+
+    public InMemoryJobScheduler(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public String getName() throws Exception {
+        return name;
+    }
+
+    public void start() throws Exception {
+        if (started.compareAndSet(false, true)) {
+            startDispatching();
+            LOG.trace("JobScheduler[{}] started", name);
+        }
+    }
+
+    public void stop() throws Exception {
+        if (started.compareAndSet(true, false)) {
+            stopDispatching();
+            timer.cancel();
+            jobs.clear();
+            LOG.trace("JobScheduler[{}] stopped", name);
+        }
+    }
+
+    public boolean isStarted() {
+        return started.get();
+    }
+
+    public boolean isDispatchEnabled() {
+        return dispatchEnabled.get();
+    }
+
+    @Override
+    public void startDispatching() throws Exception {
+        dispatchEnabled.set(true);
+    }
+
+    @Override
+    public void stopDispatching() throws Exception {
+        dispatchEnabled.set(false);
+    }
+
+    @Override
+    public void addListener(JobListener listener) throws Exception {
+        this.jobListeners.add(listener);
+    }
+
+    @Override
+    public void removeListener(JobListener listener) throws Exception {
+        this.jobListeners.remove(listener);
+    }
+
+    @Override
+    public void schedule(String jobId, ByteSequence payload, long delay) throws Exception {
+        doSchedule(jobId, payload, "", 0, delay, 0);
+    }
+
+    @Override
+    public void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception {
+        doSchedule(jobId, payload, cronEntry, 0, 0, 0);
+    }
+
+    @Override
+    public void schedule(String jobId, ByteSequence payload, String cronEntry, long delay, long period, int repeat) throws Exception {
+        doSchedule(jobId, payload, cronEntry, delay, period, repeat);
+    }
+
+    @Override
+    public void remove(long time) throws Exception {
+        doRemoveRange(time, time);
+    }
+
+    @Override
+    public void remove(String jobId) throws Exception {
+        doRemoveJob(jobId);
+    }
+
+    @Override
+    public void removeAllJobs() throws Exception {
+        doRemoveRange(0, Long.MAX_VALUE);
+    }
+
+    @Override
+    public void removeAllJobs(long start, long finish) throws Exception {
+        doRemoveRange(start, finish);
+    }
+
+    @Override
+    public long getNextScheduleTime() throws Exception {
+        long nextExecutionTime = -1L;
+
+        lock.readLock().lock();
+        try {
+            if (!jobs.isEmpty()) {
+                nextExecutionTime = jobs.entrySet().iterator().next().getKey();
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+        return nextExecutionTime;
+    }
+
+    @Override
+    public List<Job> getNextScheduleJobs() throws Exception {
+        List<Job> result = new ArrayList<Job>();
+        lock.readLock().lock();
+        try {
+            if (!jobs.isEmpty()) {
+                result.addAll(jobs.entrySet().iterator().next().getValue().getAllJobs());
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+        return result;
+    }
+
+    @Override
+    public List<Job> getAllJobs() throws Exception {
+        final List<Job> result = new ArrayList<Job>();
+        this.lock.readLock().lock();
+        try {
+            for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) {
+                result.addAll(entry.getValue().getAllJobs());
+            }
+        } finally {
+            this.lock.readLock().unlock();
+        }
+
+        return result;
+    }
+
+    @Override
+    public List<Job> getAllJobs(long start, long finish) throws Exception {
+        final List<Job> result = new ArrayList<Job>();
+        this.lock.readLock().lock();
+        try {
+            for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) {
+                long jobTime = entry.getKey();
+                if (start <= jobTime && jobTime <= finish) {
+                    result.addAll(entry.getValue().getAllJobs());
+                }
+            }
+        } finally {
+            this.lock.readLock().unlock();
+        }
+        return result;
+    }
+
+    @Override
+    public int hashCode() {
+        return name.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "JobScheduler: " + name;
+    }
+
+    private void doSchedule(final String jobId, final ByteSequence payload, final String cronEntry, long delay, long period, int repeat) throws IOException {
+        long startTime = System.currentTimeMillis();
+        long executionTime = 0;
+        // round startTime - so we can schedule more jobs at the same time
+        startTime = (startTime / 1000) * 1000;
+        if (cronEntry != null && cronEntry.length() > 0) {
+            try {
+                executionTime = CronParser.getNextScheduledTime(cronEntry, startTime);
+            } catch (MessageFormatException e) {
+                throw new IOException(e.getMessage());
+            }
+        }
+
+        if (executionTime == 0) {
+            // start time not set by CRON - so it it to the current time
+            executionTime = startTime;
+        }
+
+        if (delay > 0) {
+            executionTime += delay;
+        } else {
+            executionTime += period;
+        }
+
+        InMemoryJob newJob = new InMemoryJob(jobId);
+        newJob.setStart(startTime);
+        newJob.setCronEntry(cronEntry);
+        newJob.setDelay(delay);
+        newJob.setPeriod(period);
+        newJob.setRepeat(repeat);
+        newJob.setNextTime(executionTime);
+        newJob.setPayload(payload.getData());
+
+        LOG.trace("JobScheduler adding job[{}] to fire at: {}", jobId, JobSupport.getDateTime(executionTime));
+
+        lock.writeLock().lock();
+        try {
+            ScheduledTask task = jobs.get(executionTime);
+            if (task == null) {
+                task = new ScheduledTask(executionTime);
+                task.add(newJob);
+                jobs.put(task.getExecutionTime(), task);
+                timer.schedule(task, new Date(newJob.getNextTime()));
+            } else {
+                task.add(newJob);
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void doReschedule(InMemoryJob job, long nextExecutionTime) {
+        job.setNextTime(nextExecutionTime);
+        job.incrementExecutionCount();
+        job.decrementRepeatCount();
+
+        LOG.trace("JobScheduler rescheduling job[{}] to fire at: {}", job.getJobId(), JobSupport.getDateTime(nextExecutionTime));
+
+        lock.writeLock().lock();
+        try {
+            ScheduledTask task = jobs.get(nextExecutionTime);
+            if (task == null) {
+                task = new ScheduledTask(nextExecutionTime);
+                task.add(job);
+                jobs.put(task.getExecutionTime(), task);
+                timer.schedule(task, new Date(task.getExecutionTime()));
+            } else {
+                task.add(job);
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+    }
+
+    private void doRemoveJob(String jobId) throws IOException {
+        this.lock.writeLock().lock();
+        try {
+            Iterator<Map.Entry<Long, ScheduledTask>> scheduled = jobs.entrySet().iterator();
+            while (scheduled.hasNext()) {
+                Map.Entry<Long, ScheduledTask> entry = scheduled.next();
+                ScheduledTask task = entry.getValue();
+                if (task.remove(jobId)) {
+                    LOG.trace("JobScheduler removing job[{}]", jobId);
+                    if (task.isEmpty()) {
+                        task.cancel();
+                        scheduled.remove();
+                    }
+                    return;
+                }
+            }
+        } finally {
+            this.lock.writeLock().unlock();
+        }
+    }
+
+    private void doRemoveRange(long start, long end) throws IOException {
+        this.lock.writeLock().lock();
+        try {
+            Iterator<Map.Entry<Long, ScheduledTask>> scheduled = jobs.entrySet().iterator();
+            while (scheduled.hasNext()) {
+                Map.Entry<Long, ScheduledTask> entry = scheduled.next();
+                long executionTime = entry.getKey();
+                if (start <= executionTime && executionTime <= end) {
+                    ScheduledTask task = entry.getValue();
+                    task.cancel();
+                    scheduled.remove();
+                }
+
+                // Don't look beyond the end range.
+                if (end < executionTime) {
+                    break;
+                }
+            }
+        } finally {
+            this.lock.writeLock().unlock();
+        }
+    }
+
+    private boolean canDispatch() {
+        return isStarted() && isDispatchEnabled();
+    }
+
+    private long calculateNextExecutionTime(InMemoryJob job, long currentTime, int repeat) throws MessageFormatException {
+        long result = currentTime;
+        String cron = job.getCronEntry();
+        if (cron != null && cron.length() > 0) {
+            result = CronParser.getNextScheduledTime(cron, result);
+        } else if (job.getRepeat() != 0) {
+            result += job.getPeriod();
+        }
+        return result;
+    }
+
+    private void dispatch(InMemoryJob job) throws IllegalStateException, IOException {
+        if (canDispatch()) {
+            LOG.debug("Firing: {}", job);
+            for (JobListener l : jobListeners) {
+                l.scheduledJob(job.getJobId(), new ByteSequence(job.getPayload()));
+            }
+        }
+    }
+
+    /*
+     * A TimerTask instance that can aggregate the execution of a number
+     * scheduled Jobs and handle rescheduling the jobs that require it.
+     */
+    private class ScheduledTask extends TimerTask {
+
+        private final Map<String, InMemoryJob> jobs = new TreeMap<String, InMemoryJob>();
+        private final long executionTime;
+
+        public ScheduledTask(long executionTime) {
+            this.executionTime = executionTime;
+        }
+
+        public long getExecutionTime() {
+            return executionTime;
+        }
+
+        /**
+         * @return a Collection containing all the managed jobs for this task.
+         */
+        public Collection<InMemoryJob> getAllJobs() {
+            return new ArrayList<InMemoryJob>(jobs.values());
+        }
+
+        /**
+         * @return true if the internal list of jobs has become empty.
+         */
+        public boolean isEmpty() {
+            return jobs.isEmpty();
+        }
+
+        /**
+         * Adds the job to the internal list of scheduled Jobs managed by this task.
+         *
+         * @param newJob
+         *        the new job to add to the list of Jobs.
+         */
+        public void add(InMemoryJob newJob) {
+            this.jobs.put(newJob.getJobId(), newJob);
+        }
+
+        /**
+         * Removes the job from the internal list of scheduled Jobs managed by this task.
+         *
+         * @param jobId
+         *        the job ID to remove from the list of Jobs.
+         *
+         * @return true if the job was removed from the list of managed jobs.
+         */
+        public boolean remove(String jobId) {
+            return jobs.remove(jobId) != null;
+        }
+
+        @Override
+        public void run() {
+            if (!isStarted()) {
+                return;
+            }
+
+            try {
+                long currentTime = System.currentTimeMillis();
+                lock.writeLock().lock();
+                try {
+                    // Remove this entry as it will now fire any scheduled jobs, if new
+                    // jobs or rescheduled jobs land in the same time slot we want them
+                    // to go into a new ScheduledTask in the Timer instance.
+                    InMemoryJobScheduler.this.jobs.remove(executionTime);
+                } finally {
+                    lock.writeLock().unlock();
+                }
+
+                long nextExecutionTime = 0;
+
+                for (InMemoryJob job : jobs.values()) {
+
+                    if (!isStarted()) {
+                        break;
+                    }
+
+                    int repeat = job.getRepeat();
+                    nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat);
+                    if (!job.isCron()) {
+                        dispatch(job);
+                        if (repeat != 0) {
+                            // Reschedule for the next time, the scheduler will take care of
+                            // updating the repeat counter on the update.
+                            doReschedule(job, nextExecutionTime);
+                        }
+                    } else {
+                        if (repeat == 0) {
+                            // This is a non-repeating Cron entry so we can fire and forget it.
+                            dispatch(job);
+                        }
+
+                        if (nextExecutionTime > currentTime) {
+                            // Reschedule the cron job as a new event, if the cron entry signals
+                            // a repeat then it will be stored separately and fired as a normal
+                            // event with decrementing repeat.
+                            doReschedule(job, nextExecutionTime);
+
+                            if (repeat != 0) {
+                                // we have a separate schedule to run at this time
+                                // so the cron job is used to set of a separate schedule
+                                // hence we won't fire the original cron job to the
+                                // listeners but we do need to start a separate schedule
+                                String jobId = ID_GENERATOR.generateId();
+                                ByteSequence payload = new ByteSequence(job.getPayload());
+                                schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat());
+                            }
+                        }
+                    }
+                }
+            } catch (Throwable e) {
+                LOG.error("Error while processing scheduled job(s).", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/433912f7/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerStore.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerStore.java
new file mode 100644
index 0000000..dd8e85a
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerStore.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.scheduler.memory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.activemq.broker.scheduler.JobScheduler;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An in-memory JobSchedulerStore implementation used for Brokers that have persistence
+ * disabled or when the JobSchedulerStore usage doesn't require a file or DB based store
+ * implementation allowing for better performance.
+ */
+public class InMemoryJobSchedulerStore extends ServiceSupport implements JobSchedulerStore {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryJobSchedulerStore.class);
+
+    private final ReentrantLock lock = new ReentrantLock();
+    private final Map<String, InMemoryJobScheduler> schedulers = new HashMap<String, InMemoryJobScheduler>();
+
+    @Override
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        for (InMemoryJobScheduler scheduler : schedulers.values()) {
+            try {
+                scheduler.stop();
+            } catch (Exception e) {
+                LOG.error("Failed to stop scheduler: {}", scheduler.getName(), e);
+            }
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        for (InMemoryJobScheduler scheduler : schedulers.values()) {
+            try {
+                scheduler.start();
+            } catch (Exception e) {
+                LOG.error("Failed to start scheduler: {}", scheduler.getName(), e);
+            }
+        }
+    }
+
+    @Override
+    public JobScheduler getJobScheduler(String name) throws Exception {
+        this.lock.lock();
+        try {
+            InMemoryJobScheduler result = this.schedulers.get(name);
+            if (result == null) {
+                LOG.debug("Creating new in-memory scheduler: {}", name);
+                result = new InMemoryJobScheduler(name);
+                this.schedulers.put(name, result);
+                if (isStarted()) {
+                    result.start();
+                }
+            }
+            return result;
+        } finally {
+            this.lock.unlock();
+        }
+    }
+
+    @Override
+    public boolean removeJobScheduler(String name) throws Exception {
+        boolean result = false;
+
+        this.lock.lock();
+        try {
+            InMemoryJobScheduler scheduler = this.schedulers.remove(name);
+            result = scheduler != null;
+            if (result) {
+                LOG.debug("Removing in-memory Job Scheduler: {}", name);
+                scheduler.stop();
+                this.schedulers.remove(name);
+            }
+        } finally {
+            this.lock.unlock();
+        }
+        return result;
+    }
+
+    //---------- Methods that don't really apply to this implementation ------//
+
+    @Override
+    public long size() {
+        return 0;
+    }
+
+    @Override
+    public File getDirectory() {
+        return null;
+    }
+
+    @Override
+    public void setDirectory(File directory) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/433912f7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
index 7bfdeb4..44ef670 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
@@ -16,7 +16,12 @@
  */
 package org.apache.activemq.broker.scheduler;
 
-import java.io.File;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -32,25 +37,12 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
-import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.ScheduledMessage;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.IOHelper;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.BlockJUnit4ClassRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@RunWith(BlockJUnit4ClassRunner.class)
-public class JmsCronSchedulerTest extends EmbeddedBrokerTestSupport {
-
-    @Rule
-    public TestName testName = new TestName();
+public class JmsCronSchedulerTest extends JobSchedulerTestSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(JmsCronSchedulerTest.class);
 
@@ -123,39 +115,4 @@ public class JmsCronSchedulerTest extends EmbeddedBrokerTestSupport {
         assertNotNull(consumer.receiveNoWait());
         assertNull(consumer.receiveNoWait());
     }
-
-    @Before
-    public void setUp() throws Exception {
-        LOG.info("Starting test {}", testName.getMethodName());
-        bindAddress = "vm://localhost";
-        super.setUp();
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        super.tearDown();
-    }
-
-    @Override
-    protected BrokerService createBroker() throws Exception {
-        return createBroker(true);
-    }
-
-    protected BrokerService createBroker(boolean delete) throws Exception {
-        File schedulerDirectory = new File("target/scheduler");
-        if (delete) {
-            IOHelper.mkdirs(schedulerDirectory);
-            IOHelper.deleteChildren(schedulerDirectory);
-        }
-        BrokerService answer = new BrokerService();
-        answer.setPersistent(true);
-        answer.getManagementContext().setCreateConnector(false);
-        answer.setDeleteAllMessagesOnStartup(true);
-        answer.setDataDirectory("target");
-        answer.setSchedulerDirectoryFile(schedulerDirectory);
-        answer.setSchedulerSupport(true);
-        answer.setUseJmx(false);
-        answer.addConnector(bindAddress);
-        return answer;
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/433912f7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
index be4c9ea..0ce584d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.activemq.broker.scheduler;
 
-import java.io.File;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -32,15 +34,14 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.ScheduledMessage;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.ProducerThread;
 import org.apache.activemq.util.Wait;
+import org.junit.Test;
 
-public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
+public class JmsSchedulerTest extends JobSchedulerTestSupport {
 
+    @Test
     public void testCron() throws Exception {
         final int COUNT = 10;
         final AtomicInteger count = new AtomicInteger();
@@ -80,6 +81,7 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
         assertEquals(COUNT, count.get());
     }
 
+    @Test
     public void testSchedule() throws Exception {
         final int COUNT = 1;
         Connection connection = createConnection();
@@ -112,6 +114,7 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
         assertEquals(latch.getCount(), 0);
     }
 
+    @Test
     public void testTransactedSchedule() throws Exception {
         final int COUNT = 1;
         Connection connection = createConnection();
@@ -150,7 +153,7 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
         assertEquals(latch.getCount(), 0);
     }
 
-
+    @Test
     public void testScheduleRepeated() throws Exception {
         final int NUMBER = 10;
         final AtomicInteger count = new AtomicInteger();
@@ -186,6 +189,7 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
         assertEquals(NUMBER, count.get());
     }
 
+    @Test
     public void testScheduleRestart() throws Exception {
         // send a message
         Connection connection = createConnection();
@@ -222,12 +226,12 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
         producer.close();
     }
 
+    @Test
     public void testJobSchedulerStoreUsage() throws Exception {
 
         // Shrink the store limit down so we get the producer to block
         broker.getSystemUsage().getJobSchedulerUsage().setLimit(10 * 1024);
 
-
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
         Connection conn = factory.createConnection();
         conn.start();
@@ -281,32 +285,4 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
 
         assertEquals("Consumer did not receive all messages.", 0, latch.getCount());
     }
-
-    @Override
-    protected void setUp() throws Exception {
-        bindAddress = "vm://localhost";
-        super.setUp();
-    }
-
-    @Override
-    protected BrokerService createBroker() throws Exception {
-        return createBroker(true);
-    }
-
-    protected BrokerService createBroker(boolean delete) throws Exception {
-        File schedulerDirectory = new File("target/scheduler");
-        if (delete) {
-            IOHelper.mkdirs(schedulerDirectory);
-            IOHelper.deleteChildren(schedulerDirectory);
-        }
-        BrokerService answer = new BrokerService();
-        answer.setPersistent(true);
-        answer.setDeleteAllMessagesOnStartup(true);
-        answer.setDataDirectory("target");
-        answer.setSchedulerDirectoryFile(schedulerDirectory);
-        answer.setSchedulerSupport(true);
-        answer.setUseJmx(false);
-        answer.addConnector(bindAddress);
-        return answer;
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/433912f7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java
index 8adb980..b5d2227 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerJmxManagementTests.java
@@ -104,7 +104,7 @@ public class JobSchedulerJmxManagementTests extends JobSchedulerTestSupport {
         long toLate = System.currentTimeMillis() + 63 * 1000;
         String next = view.getNextScheduleTime();
         long nextTime = JobSupport.getDataTime(next);
-        LOG.info("Next Scheduled Time: {}", next);
+        LOG.info("Next Scheduled Time: {} should be after: {}", next, JobSupport.getDateTime(before));
         assertTrue(nextTime > before);
         assertTrue(nextTime < toLate);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/433912f7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
index 2210eba..b84a782 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
@@ -275,11 +275,14 @@ public class JobSchedulerTest {
         IOHelper.mkdirs(directory);
         IOHelper.deleteChildren(directory);
         startStore(directory);
+    }
 
+    protected JobSchedulerStore createJobSchedulerStore() throws Exception {
+        return new JobSchedulerStoreImpl();
     }
 
     protected void startStore(File directory) throws Exception {
-        store = new JobSchedulerStoreImpl();
+        store = createJobSchedulerStore();
         store.setDirectory(directory);
         store.start();
         scheduler = store.getJobScheduler("test");

http://git-wip-us.apache.org/repos/asf/activemq/blob/433912f7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
index 2b25797..5bf8d8c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
@@ -82,6 +82,10 @@ public class JobSchedulerTestSupport {
         return false;
     }
 
+    protected boolean isPersistent() {
+        return true;
+    }
+
     protected JobSchedulerViewMBean getJobSchedulerMBean() throws Exception {
         ObjectName objectName = broker.getAdminView().getJMSJobScheduler();
         JobSchedulerViewMBean scheduler = null;
@@ -101,7 +105,7 @@ public class JobSchedulerTestSupport {
         }
 
         BrokerService answer = new BrokerService();
-        answer.setPersistent(true);
+        answer.setPersistent(isPersistent());
         answer.setDeleteAllMessagesOnStartup(true);
         answer.setDataDirectory("target");
         answer.setSchedulerDirectoryFile(schedulerDirectory);

http://git-wip-us.apache.org/repos/asf/activemq/blob/433912f7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTxTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTxTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTxTest.java
index 2210f9d..996cc55 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTxTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTxTest.java
@@ -19,7 +19,6 @@ package org.apache.activemq.broker.scheduler;
 
 import static org.junit.Assert.assertEquals;
 
-import java.io.File;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -31,34 +30,10 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ScheduledMessage;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.util.IOHelper;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
-public class JobSchedulerTxTest {
-
-    private BrokerService broker;
-    private final String connectionUri = "vm://localhost";
-    private final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
-    private final ActiveMQQueue destination = new ActiveMQQueue("Target.Queue");
-
-    @Before
-    public void setUp() throws Exception {
-        broker = createBroker();
-        broker.start();
-        broker.waitUntilStarted();
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        broker.stop();
-        broker.waitUntilStopped();
-    }
+public class JobSchedulerTxTest extends JobSchedulerTestSupport {
 
     @Test
     public void testTxSendWithRollback() throws Exception {
@@ -129,29 +104,4 @@ public class JobSchedulerTxTest {
         latch.await(5, TimeUnit.SECONDS);
         assertEquals(0, latch.getCount());
     }
-
-    protected Connection createConnection() throws Exception {
-        return cf.createConnection();
-    }
-
-    protected BrokerService createBroker() throws Exception {
-        return createBroker(true);
-    }
-
-    protected BrokerService createBroker(boolean delete) throws Exception {
-        File schedulerDirectory = new File("target/scheduler");
-        if (delete) {
-            IOHelper.mkdirs(schedulerDirectory);
-            IOHelper.deleteChildren(schedulerDirectory);
-        }
-        BrokerService answer = new BrokerService();
-        answer.setPersistent(true);
-        answer.setDeleteAllMessagesOnStartup(true);
-        answer.setDataDirectory("target");
-        answer.setSchedulerDirectoryFile(schedulerDirectory);
-        answer.setSchedulerSupport(true);
-        answer.setUseJmx(false);
-        answer.addConnector(connectionUri);
-        return answer;
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/433912f7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemeoryJmsSchedulerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemeoryJmsSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemeoryJmsSchedulerTest.java
new file mode 100644
index 0000000..5144203
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemeoryJmsSchedulerTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler.memory;
+
+import org.apache.activemq.broker.scheduler.JmsSchedulerTest;
+
+/**
+ * Test for the In-Memory Scheduler variant.
+ */
+public class InMemeoryJmsSchedulerTest extends JmsSchedulerTest {
+
+    @Override
+    protected boolean isPersistent() {
+        return false;
+    }
+
+    @Override
+    public void testScheduleRestart() throws Exception {
+        // No persistence so scheduled jobs don't survive restart.
+    }
+
+    @Override
+    public void testJobSchedulerStoreUsage() throws Exception {
+        // No store usage numbers for in-memory store.
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/433912f7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJmsCronSchedulerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJmsCronSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJmsCronSchedulerTest.java
new file mode 100644
index 0000000..a3b7d04
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJmsCronSchedulerTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler.memory;
+
+import org.apache.activemq.broker.scheduler.JmsCronSchedulerTest;
+
+/**
+ * In memory version of the cron scheduler test.
+ */
+public class InMemoryJmsCronSchedulerTest extends JmsCronSchedulerTest {
+
+    @Override
+    protected boolean isPersistent() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/433912f7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerJmxManagementTests.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerJmxManagementTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerJmxManagementTests.java
new file mode 100644
index 0000000..46f5540
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerJmxManagementTests.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler.memory;
+
+import org.apache.activemq.broker.scheduler.JobSchedulerJmxManagementTests;
+
+/**
+ * Test for the In-Memory scheduler's JMX management features.
+ */
+public class InMemoryJobSchedulerJmxManagementTests extends JobSchedulerJmxManagementTests {
+
+    @Override
+    protected boolean isPersistent() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/433912f7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerManagementTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerManagementTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerManagementTest.java
new file mode 100644
index 0000000..e65d819
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerManagementTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler.memory;
+
+import org.apache.activemq.broker.scheduler.JobSchedulerManagementTest;
+
+/**
+ * Tests management of in memory scheduler via JMS client.
+ */
+public class InMemoryJobSchedulerManagementTest extends JobSchedulerManagementTest {
+
+    @Override
+    protected boolean isPersistent() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/433912f7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerStoreTest.java
new file mode 100644
index 0000000..ac90070
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerStoreTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler.memory;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.broker.scheduler.Job;
+import org.apache.activemq.broker.scheduler.JobScheduler;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOHelper;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class InMemoryJobSchedulerStoreTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryJobSchedulerStoreTest.class);
+
+    @Test(timeout = 120 * 1000)
+    public void testRestart() throws Exception {
+        InMemoryJobSchedulerStore store = new InMemoryJobSchedulerStore();
+        File directory = new File("target/test/ScheduledDB");
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        store.setDirectory(directory);
+        final int NUMBER = 1000;
+        store.start();
+        List<ByteSequence> list = new ArrayList<ByteSequence>();
+        for (int i = 0; i < NUMBER; i++) {
+            ByteSequence buff = new ByteSequence(new String("testjob" + i).getBytes());
+            list.add(buff);
+        }
+
+        JobScheduler js = store.getJobScheduler("test");
+        js.startDispatching();
+        int count = 0;
+        long startTime = 10 * 60 * 1000;
+        long period = startTime;
+        for (ByteSequence job : list) {
+            js.schedule("id:" + (count++), job, "", startTime, period, -1);
+        }
+
+        List<Job> test = js.getAllJobs();
+        LOG.debug("Found {} jobs in the store before restart", test.size());
+        assertEquals(list.size(), test.size());
+        store.stop();
+        store.start();
+        js = store.getJobScheduler("test");
+        test = js.getAllJobs();
+        LOG.debug("Found {} jobs in the store after restart", test.size());
+        assertEquals(0, test.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/433912f7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java
new file mode 100644
index 0000000..36771b0
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler.memory;
+
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
+import org.apache.activemq.broker.scheduler.JobSchedulerTest;
+
+/**
+ * In-Memory store based variation of the JobSchedulerTest
+ */
+public class InMemoryJobSchedulerTest extends JobSchedulerTest {
+
+    @Override
+    public void testAddStopThenDeliver() throws Exception {
+        // In Memory store that's stopped doesn't retain the jobs.
+    }
+
+    @Override
+    protected JobSchedulerStore createJobSchedulerStore() throws Exception {
+        return new InMemoryJobSchedulerStore();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/433912f7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTxTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTxTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTxTest.java
new file mode 100644
index 0000000..fb87905
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTxTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler.memory;
+
+import org.apache.activemq.broker.scheduler.JobSchedulerTxTest;
+
+/**
+ * In memory version of the TX test case
+ */
+public class InMemoryJobSchedulerTxTest extends JobSchedulerTxTest {
+
+    @Override
+    protected boolean isPersistent() {
+        return false;
+    }
+}


Mime
View raw message