activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject [15/16] activemq git commit: https://issues.apache.org/jira/browse/AMQ-3758
Date Tue, 16 Dec 2014 22:10:03 GMT
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
new file mode 100644
index 0000000..6003c87
--- /dev/null
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
@@ -0,0 +1,745 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.activemq.broker.LockableServiceSupport;
+import org.apache.activemq.broker.Locker;
+import org.apache.activemq.store.SharedFileLocker;
+import org.apache.activemq.store.kahadb.data.KahaEntryType;
+import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.page.PageFile;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.ServiceStopper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractKahaDBStore extends LockableServiceSupport {
+
+    static final Logger LOG = LoggerFactory.getLogger(AbstractKahaDBStore.class);
+
+    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
+    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
+
+    protected File directory;
+    protected PageFile pageFile;
+    protected Journal journal;
+    protected AtomicLong journalSize = new AtomicLong(0);
+    protected boolean failIfDatabaseIsLocked;
+    protected long checkpointInterval = 5*1000;
+    protected long cleanupInterval = 30*1000;
+    protected boolean checkForCorruptJournalFiles = false;
+    protected boolean checksumJournalFiles = true;
+    protected boolean forceRecoverIndex = false;
+    protected int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
+    protected int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
+    protected boolean archiveCorruptedIndex = false;
+    protected boolean enableIndexWriteAsync = false;
+    protected boolean enableJournalDiskSyncs = false;
+    protected boolean deleteAllJobs = false;
+    protected int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
+    protected boolean useIndexLFRUEviction = false;
+    protected float indexLFUEvictionFactor = 0.2f;
+    protected boolean ignoreMissingJournalfiles = false;
+    protected int indexCacheSize = 1000;
+    protected boolean enableIndexDiskSyncs = true;
+    protected boolean enableIndexRecoveryFile = true;
+    protected boolean enableIndexPageCaching = true;
+    protected boolean archiveDataLogs;
+    protected boolean purgeStoreOnStartup;
+    protected File directoryArchive;
+
+    protected AtomicBoolean opened = new AtomicBoolean();
+    protected Thread checkpointThread;
+    protected final Object checkpointThreadLock = new Object();
+    protected ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
+    protected ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
+
+    /**
+     * @return the name to give this store's PageFile instance.
+     */
+    protected abstract String getPageFileName();
+
+    /**
+     * @return the location of the data directory if no set by configuration.
+     */
+    protected abstract File getDefaultDataDirectory();
+
+    /**
+     * Loads the store from disk.
+     *
+     * Based on configuration this method can either load an existing store or it can purge
+     * an existing store and start in a clean state.
+     *
+     * @throws IOException if an error occurs during the load.
+     */
+    public abstract void load() throws IOException;
+
+    /**
+     * Unload the state of the Store to disk and shuts down all resources assigned to this
+     * KahaDB store implementation.
+     *
+     * @throws IOException if an error occurs during the store unload.
+     */
+    public abstract void unload() throws IOException;
+
+    @Override
+    protected void doStart() throws Exception {
+        this.indexLock.writeLock().lock();
+        if (getDirectory() == null) {
+            setDirectory(getDefaultDataDirectory());
+        }
+        IOHelper.mkdirs(getDirectory());
+        try {
+            if (isPurgeStoreOnStartup()) {
+                getJournal().start();
+                getJournal().delete();
+                getJournal().close();
+                journal = null;
+                getPageFile().delete();
+                LOG.info("{} Persistence store purged.", this);
+                setPurgeStoreOnStartup(false);
+            }
+
+            load();
+            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
+        } finally {
+            this.indexLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        unload();
+    }
+
+    public PageFile getPageFile() {
+        if (pageFile == null) {
+            pageFile = createPageFile();
+        }
+        return pageFile;
+    }
+
+    public Journal getJournal() throws IOException {
+        if (journal == null) {
+            journal = createJournal();
+        }
+        return journal;
+    }
+
+    public File getDirectory() {
+        return directory;
+    }
+
+    public void setDirectory(File directory) {
+        this.directory = directory;
+    }
+
+    public boolean isArchiveCorruptedIndex() {
+        return archiveCorruptedIndex;
+    }
+
+    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
+        this.archiveCorruptedIndex = archiveCorruptedIndex;
+    }
+
+    public boolean isFailIfDatabaseIsLocked() {
+        return failIfDatabaseIsLocked;
+    }
+
+    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
+        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
+    }
+
+    public boolean isCheckForCorruptJournalFiles() {
+        return checkForCorruptJournalFiles;
+    }
+
+    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
+        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
+    }
+
+    public long getCheckpointInterval() {
+        return checkpointInterval;
+    }
+
+    public void setCheckpointInterval(long checkpointInterval) {
+        this.checkpointInterval = checkpointInterval;
+    }
+
+    public long getCleanupInterval() {
+        return cleanupInterval;
+    }
+
+    public void setCleanupInterval(long cleanupInterval) {
+        this.cleanupInterval = cleanupInterval;
+    }
+
+    public boolean isChecksumJournalFiles() {
+        return checksumJournalFiles;
+    }
+
+    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
+        this.checksumJournalFiles = checksumJournalFiles;
+    }
+
+    public boolean isForceRecoverIndex() {
+        return forceRecoverIndex;
+    }
+
+    public void setForceRecoverIndex(boolean forceRecoverIndex) {
+        this.forceRecoverIndex = forceRecoverIndex;
+    }
+
+    public int getJournalMaxFileLength() {
+        return journalMaxFileLength;
+    }
+
+    public void setJournalMaxFileLength(int journalMaxFileLength) {
+        this.journalMaxFileLength = journalMaxFileLength;
+    }
+
+    public int getJournalMaxWriteBatchSize() {
+        return journalMaxWriteBatchSize;
+    }
+
+    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
+        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
+    }
+
+    public boolean isEnableIndexWriteAsync() {
+        return enableIndexWriteAsync;
+    }
+
+    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
+        this.enableIndexWriteAsync = enableIndexWriteAsync;
+    }
+
+    public boolean isEnableJournalDiskSyncs() {
+        return enableJournalDiskSyncs;
+    }
+
+    public void setEnableJournalDiskSyncs(boolean syncWrites) {
+        this.enableJournalDiskSyncs = syncWrites;
+    }
+
+    public boolean isDeleteAllJobs() {
+        return deleteAllJobs;
+    }
+
+    public void setDeleteAllJobs(boolean deleteAllJobs) {
+        this.deleteAllJobs = deleteAllJobs;
+    }
+
+    /**
+     * @return the archiveDataLogs
+     */
+    public boolean isArchiveDataLogs() {
+        return this.archiveDataLogs;
+    }
+
+    /**
+     * @param archiveDataLogs the archiveDataLogs to set
+     */
+    public void setArchiveDataLogs(boolean archiveDataLogs) {
+        this.archiveDataLogs = archiveDataLogs;
+    }
+
+    /**
+     * @return the directoryArchive
+     */
+    public File getDirectoryArchive() {
+        return this.directoryArchive;
+    }
+
+    /**
+     * @param directoryArchive the directoryArchive to set
+     */
+    public void setDirectoryArchive(File directoryArchive) {
+        this.directoryArchive = directoryArchive;
+    }
+
+    public int getIndexCacheSize() {
+        return indexCacheSize;
+    }
+
+    public void setIndexCacheSize(int indexCacheSize) {
+        this.indexCacheSize = indexCacheSize;
+    }
+
+    public int getIndexWriteBatchSize() {
+        return indexWriteBatchSize;
+    }
+
+    public void setIndexWriteBatchSize(int indexWriteBatchSize) {
+        this.indexWriteBatchSize = indexWriteBatchSize;
+    }
+
+    public boolean isUseIndexLFRUEviction() {
+        return useIndexLFRUEviction;
+    }
+
+    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
+        this.useIndexLFRUEviction = useIndexLFRUEviction;
+    }
+
+    public float getIndexLFUEvictionFactor() {
+        return indexLFUEvictionFactor;
+    }
+
+    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
+        this.indexLFUEvictionFactor = indexLFUEvictionFactor;
+    }
+
+    public boolean isEnableIndexDiskSyncs() {
+        return enableIndexDiskSyncs;
+    }
+
+    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
+        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
+    }
+
+    public boolean isEnableIndexRecoveryFile() {
+        return enableIndexRecoveryFile;
+    }
+
+    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
+        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
+    }
+
+    public boolean isEnableIndexPageCaching() {
+        return enableIndexPageCaching;
+    }
+
+    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
+        this.enableIndexPageCaching = enableIndexPageCaching;
+    }
+
+    public boolean isPurgeStoreOnStartup() {
+        return this.purgeStoreOnStartup;
+    }
+
+    public void setPurgeStoreOnStartup(boolean purge) {
+        this.purgeStoreOnStartup = purge;
+    }
+
+    public boolean isIgnoreMissingJournalfiles() {
+        return ignoreMissingJournalfiles;
+    }
+
+    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
+        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
+    }
+
+    public long size() {
+        if (!isStarted()) {
+            return 0;
+        }
+        try {
+            return journalSize.get() + pageFile.getDiskSize();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Locker createDefaultLocker() throws IOException {
+        SharedFileLocker locker = new SharedFileLocker();
+        locker.setDirectory(this.getDirectory());
+        return locker;
+    }
+
+    @Override
+    public void init() throws Exception {
+    }
+
+    /**
+     * Store a command in the Journal and process to update the Store index.
+     *
+     * @param command
+     *      The specific JournalCommand to store and process.
+     *
+     * @returns the Location where the data was written in the Journal.
+     *
+     * @throws IOException if an error occurs storing or processing the command.
+     */
+    public Location store(JournalCommand<?> command) throws IOException {
+        return store(command, isEnableIndexDiskSyncs(), null, null, null);
+    }
+
+    /**
+     * Store a command in the Journal and process to update the Store index.
+     *
+     * @param command
+     *      The specific JournalCommand to store and process.
+     * @param sync
+     *      Should the store operation be done synchronously. (ignored if completion passed).
+     *
+     * @returns the Location where the data was written in the Journal.
+     *
+     * @throws IOException if an error occurs storing or processing the command.
+     */
+    public Location store(JournalCommand<?> command, boolean sync) throws IOException {
+        return store(command, sync, null, null, null);
+    }
+
+    /**
+     * Store a command in the Journal and process to update the Store index.
+     *
+     * @param command
+     *      The specific JournalCommand to store and process.
+     * @param onJournalStoreComplete
+     *      The Runnable to call when the Journal write operation completes.
+     *
+     * @returns the Location where the data was written in the Journal.
+     *
+     * @throws IOException if an error occurs storing or processing the command.
+     */
+    public Location store(JournalCommand<?> command, Runnable onJournalStoreComplete) throws IOException {
+        return store(command, isEnableIndexDiskSyncs(), null, null, onJournalStoreComplete);
+    }
+
+    /**
+     * Store a command in the Journal and process to update the Store index.
+     *
+     * @param command
+     *      The specific JournalCommand to store and process.
+     * @param sync
+     *      Should the store operation be done synchronously. (ignored if completion passed).
+     * @param before
+     *      The Runnable instance to execute before performing the store and process operation.
+     * @param after
+     *      The Runnable instance to execute after performing the store and process operation.
+     *
+     * @returns the Location where the data was written in the Journal.
+     *
+     * @throws IOException if an error occurs storing or processing the command.
+     */
+    public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after) throws IOException {
+        return store(command, sync, before, after, null);
+    }
+
+    /**
+     * All updated are are funneled through this method. The updates are converted to a
+     * JournalMessage which is logged to the journal and then the data from the JournalMessage
+     * is used to update the index just like it would be done during a recovery process.
+     *
+     * @param command
+     *      The specific JournalCommand to store and process.
+     * @param sync
+     *      Should the store operation be done synchronously. (ignored if completion passed).
+     * @param before
+     *      The Runnable instance to execute before performing the store and process operation.
+     * @param after
+     *      The Runnable instance to execute after performing the store and process operation.
+     * @param onJournalStoreComplete
+     *      Callback to be run when the journal write operation is complete.
+     *
+     * @returns the Location where the data was written in the Journal.
+     *
+     * @throws IOException if an error occurs storing or processing the command.
+     */
+    public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
+        try {
+
+            if (before != null) {
+                before.run();
+            }
+
+            ByteSequence sequence = toByteSequence(command);
+            Location location;
+            checkpointLock.readLock().lock();
+            try {
+
+                long start = System.currentTimeMillis();
+                location = onJournalStoreComplete == null ? journal.write(sequence, sync) :
+                                                            journal.write(sequence, onJournalStoreComplete);
+                long start2 = System.currentTimeMillis();
+
+                process(command, location);
+
+                long end = System.currentTimeMillis();
+                if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
+                    LOG.info("Slow KahaDB access: Journal append took: {} ms, Index update took {} ms",
+                             (start2-start), (end-start2));
+                }
+            } finally {
+                checkpointLock.readLock().unlock();
+            }
+
+            if (after != null) {
+                after.run();
+            }
+
+            if (checkpointThread != null && !checkpointThread.isAlive()) {
+                startCheckpoint();
+            }
+            return location;
+        } catch (IOException ioe) {
+            LOG.error("KahaDB failed to store to Journal", ioe);
+            if (brokerService != null) {
+                brokerService.handleIOException(ioe);
+            }
+            throw ioe;
+        }
+    }
+
+    /**
+     * Loads a previously stored JournalMessage
+     *
+     * @param location
+     *      The location of the journal command to read.
+     *
+     * @return a new un-marshaled JournalCommand instance.
+     *
+     * @throws IOException if an error occurs reading the stored command.
+     */
+    protected JournalCommand<?> load(Location location) throws IOException {
+        ByteSequence data = journal.read(location);
+        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
+        byte readByte = is.readByte();
+        KahaEntryType type = KahaEntryType.valueOf(readByte);
+        if (type == null) {
+            try {
+                is.close();
+            } catch (IOException e) {
+            }
+            throw new IOException("Could not load journal record. Invalid location: " + location);
+        }
+        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
+        message.mergeFramed(is);
+        return message;
+    }
+
+    /**
+     * Process a stored or recovered JournalCommand instance and update the DB Index with the
+     * state changes that this command produces.  This can be called either as a new DB operation
+     * or as a replay during recovery operations.
+     *
+     * @param command
+     *      The JournalCommand to process.
+     * @param location
+     *      The location in the Journal where the command was written or read from.
+     */
+    protected abstract void process(JournalCommand<?> command, Location location) throws IOException;
+
+    /**
+     * Perform a checkpoint operation with optional cleanup.
+     *
+     * Called by the checkpoint background thread periodically to initiate a checkpoint operation
+     * and if the cleanup flag is set a cleanup sweep should be done to allow for release of no
+     * longer needed journal log files etc.
+     *
+     * @param cleanup
+     *      Should the method do a simple checkpoint or also perform a journal cleanup.
+     *
+     * @throws IOException if an error occurs during the checkpoint operation.
+     */
+    protected void checkpointUpdate(final boolean cleanup) throws IOException {
+        checkpointLock.writeLock().lock();
+        try {
+            this.indexLock.writeLock().lock();
+            try {
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    @Override
+                    public void execute(Transaction tx) throws IOException {
+                        checkpointUpdate(tx, cleanup);
+                    }
+                });
+            } finally {
+                this.indexLock.writeLock().unlock();
+            }
+
+        } finally {
+            checkpointLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Perform the checkpoint update operation.  If the cleanup flag is true then the
+     * operation should also purge any unused Journal log files.
+     *
+     * This method must always be called with the checkpoint and index write locks held.
+     *
+     * @param tx
+     *      The TX under which to perform the checkpoint update.
+     * @param cleanup
+     *      Should the checkpoint also do unused Journal file cleanup.
+     *
+     * @throws IOException if an error occurs while performing the checkpoint.
+     */
+    protected abstract void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException;
+
+    /**
+     * Creates a new ByteSequence that represents the marshaled form of the given Journal Command.
+     *
+     * @param command
+     *      The Journal Command that should be marshaled to bytes for writing.
+     *
+     * @return the byte representation of the given journal command.
+     *
+     * @throws IOException if an error occurs while serializing the command.
+     */
+    protected ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
+        int size = data.serializedSizeFramed();
+        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
+        os.writeByte(data.type().getNumber());
+        data.writeFramed(os);
+        return os.toByteSequence();
+    }
+
+    /**
+     * Create the PageFile instance and configure it using the configuration options
+     * currently set.
+     *
+     * @return the newly created and configured PageFile instance.
+     */
+    protected PageFile createPageFile() {
+        PageFile index = new PageFile(getDirectory(), getPageFileName());
+        index.setEnableWriteThread(isEnableIndexWriteAsync());
+        index.setWriteBatchSize(getIndexWriteBatchSize());
+        index.setPageCacheSize(getIndexCacheSize());
+        index.setUseLFRUEviction(isUseIndexLFRUEviction());
+        index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
+        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
+        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
+        index.setEnablePageCaching(isEnableIndexPageCaching());
+        return index;
+    }
+
+    /**
+     * Create a new Journal instance and configure it using the currently set configuration
+     * options.  If an archive directory is configured than this method will attempt to create
+     * that directory if it does not already exist.
+     *
+     * @return the newly created an configured Journal instance.
+     *
+     * @throws IOException if an error occurs while creating the Journal object.
+     */
+    protected Journal createJournal() throws IOException {
+        Journal manager = new Journal();
+        manager.setDirectory(getDirectory());
+        manager.setMaxFileLength(getJournalMaxFileLength());
+        manager.setCheckForCorruptionOnStartup(isCheckForCorruptJournalFiles());
+        manager.setChecksum(isChecksumJournalFiles() || isCheckForCorruptJournalFiles());
+        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
+        manager.setArchiveDataLogs(isArchiveDataLogs());
+        manager.setSizeAccumulator(journalSize);
+        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
+        if (getDirectoryArchive() != null) {
+            IOHelper.mkdirs(getDirectoryArchive());
+            manager.setDirectoryArchive(getDirectoryArchive());
+        }
+        return manager;
+    }
+
+    /**
+     * Starts the checkpoint Thread instance if not already running and not disabled
+     * by configuration.
+     */
+    protected void startCheckpoint() {
+        if (checkpointInterval == 0 && cleanupInterval == 0) {
+            LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
+            return;
+        }
+        synchronized (checkpointThreadLock) {
+            boolean start = false;
+            if (checkpointThread == null) {
+                start = true;
+            } else if (!checkpointThread.isAlive()) {
+                start = true;
+                LOG.info("KahaDB: Recovering checkpoint thread after death");
+            }
+            if (start) {
+                checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+                    @Override
+                    public void run() {
+                        try {
+                            long lastCleanup = System.currentTimeMillis();
+                            long lastCheckpoint = System.currentTimeMillis();
+                            // Sleep for a short time so we can periodically check
+                            // to see if we need to exit this thread.
+                            long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
+                            while (opened.get()) {
+                                Thread.sleep(sleepTime);
+                                long now = System.currentTimeMillis();
+                                if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
+                                    checkpointCleanup(true);
+                                    lastCleanup = now;
+                                    lastCheckpoint = now;
+                                } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
+                                    checkpointCleanup(false);
+                                    lastCheckpoint = now;
+                                }
+                            }
+                        } catch (InterruptedException e) {
+                            // Looks like someone really wants us to exit this thread...
+                        } catch (IOException ioe) {
+                            LOG.error("Checkpoint failed", ioe);
+                            brokerService.handleIOException(ioe);
+                        }
+                    }
+                };
+
+                checkpointThread.setDaemon(true);
+                checkpointThread.start();
+            }
+        }
+    }
+
+    /**
+     * Called from the worker thread to start a checkpoint.
+     *
+     * This method ensure that the store is in an opened state and optionaly logs information
+     * related to slow store access times.
+     *
+     * @param cleanup
+     *      Should a cleanup of the journal occur during the checkpoint operation.
+     *
+     * @throws IOException if an error occurs during the checkpoint operation.
+     */
+    protected void checkpointCleanup(final boolean cleanup) throws IOException {
+        long start;
+        this.indexLock.writeLock().lock();
+        try {
+            start = System.currentTimeMillis();
+            if (!opened.get()) {
+                return;
+            }
+        } finally {
+            this.indexLock.writeLock().unlock();
+        }
+        checkpointUpdate(cleanup);
+        long end = System.currentTimeMillis();
+        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
+            LOG.info("Slow KahaDB access: cleanup took {}", (end - start));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java
new file mode 100644
index 0000000..defb238
--- /dev/null
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.page.Page;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+
+/**
+ * Interface for the store meta data used to hold the index value and other needed
+ * information to manage a KahaDB store instance.
+ */
+public interface KahaDBMetaData<T> {
+
+    /**
+     * Indicates that this meta data instance has been opened and is active.
+     */
+    public static final int OPEN_STATE = 2;
+
+    /**
+     * Indicates that this meta data instance has been closed and is no longer active.
+     */
+    public static final int CLOSED_STATE = 1;
+
+    /**
+     * Gets the Page in the store PageFile where the KahaDBMetaData instance is stored.
+     *
+     * @return the Page to use to start access the KahaDBMetaData instance.
+     */
+    Page<T> getPage();
+
+    /**
+     * Sets the Page instance used to load and store the KahaDBMetaData instance.
+     *
+     * @param page
+     *        the new Page value to use.
+     */
+    void setPage(Page<T> page);
+
+    /**
+     * Gets the state flag of this meta data instance.
+     *
+     *  @return the current state value for this instance.
+     */
+    int getState();
+
+    /**
+     * Sets the current value of the state flag.
+     *
+     * @param value
+     *        the new value to assign to the state flag.
+     */
+    void setState(int value);
+
+    /**
+     * Returns the Journal Location value that indicates that last recorded update
+     * that was successfully performed for this KahaDB store implementation.
+     *
+     * @return the location of the last successful update location.
+     */
+    Location getLastUpdateLocation();
+
+    /**
+     * Updates the value of the last successful update.
+     *
+     * @param location
+     *        the new value to assign the last update location field.
+     */
+    void setLastUpdateLocation(Location location);
+
+    /**
+     * For a newly created KahaDBMetaData instance this method is called to allow
+     * the instance to create all of it's internal indices and other state data.
+     *
+     * @param tx
+     *        the Transaction instance under which the operation is executed.
+     *
+     * @throws IOException if an error occurs while creating the meta data structures.
+     */
+    void initialize(Transaction tx) throws IOException;
+
+    /**
+     * Instructs this object to load its internal data structures from the KahaDB PageFile
+     * and prepare itself for use.
+     *
+     * @param tx
+     *        the Transaction instance under which the operation is executed.
+     *
+     * @throws IOException if an error occurs while creating the meta data structures.
+     */
+    void load(Transaction tx) throws IOException;
+
+    /**
+     * Reads the serialized for of this object from the KadaDB PageFile and prepares it
+     * for use.  This method does not need to perform a full load of the meta data structures
+     * only read in the information necessary to load them from the PageFile on a call to the
+     * load method.
+     *
+     * @param in
+     *        the DataInput instance used to read this objects serialized form.
+     *
+     * @throws IOException if an error occurs while reading the serialized form.
+     */
+    void read(DataInput in) throws IOException;
+
+    /**
+     * Writes the object into a serialized form which can be read back in again using the
+     * read method.
+     *
+     * @param out
+     *        the DataOutput instance to use to write the current state to a serialized form.
+     *
+     * @throws IOException if an error occurs while serializing this instance.
+     */
+    void write(DataOutput out) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index d8b986e..8ca8ca4 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -31,6 +31,7 @@ import org.apache.activemq.broker.LockableServiceSupport;
 import org.apache.activemq.broker.Locker;
 import org.apache.activemq.broker.jmx.AnnotatedMBean;
 import org.apache.activemq.broker.jmx.PersistenceAdapterView;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -39,7 +40,14 @@ import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.*;
+import org.apache.activemq.store.JournaledStore;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.SharedFileLocker;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionIdTransformer;
+import org.apache.activemq.store.TransactionIdTransformerAware;
+import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
@@ -642,4 +650,9 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
     public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
         getStore().setTransactionIdTransformer(transactionIdTransformer);
     }
+
+    @Override
+    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+        return this.letter.createJobSchedulerStore();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 60c0738..975cd05 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTempQueue;
@@ -55,7 +56,14 @@ import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.*;
+import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.ListenableFuture;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionIdTransformer;
+import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaDestination;
 import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
@@ -66,6 +74,7 @@ import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
 import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ServiceStopper;
@@ -172,6 +181,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
     public int getMaxAsyncJobs() {
         return this.maxAsyncJobs;
     }
+
     /**
      * @param maxAsyncJobs
      *            the maxAsyncJobs to set
@@ -426,6 +436,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
 
         }
 
+        @Override
         public void updateMessage(Message message) throws IOException {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
@@ -472,7 +483,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
             indexLock.writeLock().lock();
             try {
                 location = findMessageLocation(key, dest);
-            }finally {
+            } finally {
                 indexLock.writeLock().unlock();
             }
             if (location == null) {
@@ -492,19 +503,17 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         @Override
                         public Integer execute(Transaction tx) throws IOException {
                             // Iterate through all index entries to get a count
-                            // of
-                            // messages in the destination.
+                            // of messages in the destination.
                             StoredDestination sd = getStoredDestination(dest, tx);
                             int rc = 0;
-                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
-                                    .hasNext();) {
+                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
                                 iterator.next();
                                 rc++;
                             }
                             return rc;
                         }
                     });
-                }finally {
+                } finally {
                     indexLock.writeLock().unlock();
                 }
             } finally {
@@ -525,7 +534,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         return sd.locationIndex.isEmpty(tx);
                     }
                 });
-            }finally {
+            } finally {
                 indexLock.writeLock().unlock();
             }
         }
@@ -552,12 +561,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         }
                     }
                 });
-            }finally {
+            } finally {
                 indexLock.writeLock().unlock();
             }
         }
 
-
         @Override
         public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
             indexLock.writeLock().lock();
@@ -583,7 +591,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         sd.orderIndex.stoppedIterating();
                     }
                 });
-            }finally {
+            } finally {
                 indexLock.writeLock().unlock();
             }
         }
@@ -628,7 +636,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         });
                 } catch (Exception e) {
                     LOG.error("Failed to reset batching",e);
-                }finally {
+                } finally {
                     indexLock.writeLock().unlock();
                 }
             }
@@ -641,8 +649,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                 lockAsyncJobQueue();
 
                 // Hopefully one day the page file supports concurrent read
-                // operations... but for now we must
-                // externally synchronize...
+                // operations... but for now we must externally synchronize...
 
                 indexLock.writeLock().lock();
                 try {
@@ -725,8 +732,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
 
         @Override
         public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
-                                MessageId messageId, MessageAck ack)
-                throws IOException {
+                                MessageId messageId, MessageAck ack) throws IOException {
             String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
             if (isConcurrentStoreAndDispatchTopics()) {
                 AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
@@ -810,7 +816,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         }
                     }
                 });
-            }finally {
+            } finally {
                 indexLock.writeLock().unlock();
             }
 
@@ -836,7 +842,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                                 .getSubscriptionInfo().newInput()));
                     }
                 });
-            }finally {
+            } finally {
                 indexLock.writeLock().unlock();
             }
         }
@@ -859,7 +865,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         return (int) getStoredMessageCount(tx, sd, subscriptionKey);
                     }
                 });
-            }finally {
+            } finally {
                 indexLock.writeLock().unlock();
             }
         }
@@ -890,7 +896,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         sd.orderIndex.resetCursorPosition();
                     }
                 });
-            }finally {
+            } finally {
                 indexLock.writeLock().unlock();
             }
         }
@@ -943,7 +949,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         }
                     }
                 });
-            }finally {
+            } finally {
                 indexLock.writeLock().unlock();
             }
         }
@@ -1358,7 +1364,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                     LOG.warn("Failed to aquire lock", e);
                 }
             }
-
         }
 
         @Override
@@ -1422,7 +1427,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
             if (runnable instanceof StoreTask) {
                ((StoreTask)runnable).releaseLocks();
             }
-
         }
     }
+
+    @Override
+    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+        return new JobSchedulerStoreImpl();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index d10c4eb..eca83e8 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -16,12 +16,44 @@
  */
 package org.apache.activemq.store.kahadb;
 
-import org.apache.activemq.broker.*;
-import org.apache.activemq.command.*;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.Lockable;
+import org.apache.activemq.broker.LockableServiceSupport;
+import org.apache.activemq.broker.Locker;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.filter.AnyDestination;
 import org.apache.activemq.filter.DestinationMap;
 import org.apache.activemq.filter.DestinationMapEntry;
-import org.apache.activemq.store.*;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.SharedFileLocker;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionIdTransformer;
+import org.apache.activemq.store.TransactionIdTransformerAware;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IOHelper;
@@ -30,13 +62,6 @@ import org.apache.activemq.util.ServiceStopper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.transaction.xa.Xid;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.*;
-
 /**
  * An implementation of {@link org.apache.activemq.store.PersistenceAdapter}  that supports
  * distribution of destinations across multiple kahaDB persistence adapters
@@ -50,6 +75,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
     final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
 
     final class DelegateDestinationMap extends DestinationMap {
+        @Override
         public void setEntries(List<DestinationMapEntry>  entries) {
             super.setEntries(entries);
         }
@@ -252,7 +278,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
         }
         if (adapter instanceof PersistenceAdapter) {
             adapter.removeQueueMessageStore(destination);
-            removeMessageStore((PersistenceAdapter)adapter, destination);
+            removeMessageStore(adapter, destination);
             destinationMap.removeAll(destination);
         }
     }
@@ -267,7 +293,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
         }
         if (adapter instanceof PersistenceAdapter) {
             adapter.removeTopicMessageStore(destination);
-            removeMessageStore((PersistenceAdapter)adapter, destination);
+            removeMessageStore(adapter, destination);
             destinationMap.removeAll(destination);
         }
     }
@@ -453,6 +479,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
         }
     }
 
+    @Override
     public BrokerService getBrokerService() {
         return brokerService;
     }
@@ -503,4 +530,9 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
         locker.configure(this);
         return locker;
     }
+
+    @Override
+    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+        return new JobSchedulerStoreImpl();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index c7ece83..8840a1d 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -31,16 +31,24 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.store.*;
+import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.ListenableFuture;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.ProxyMessageStore;
+import org.apache.activemq.store.ProxyTopicMessageStore;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionRecoveryListener;
+import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
 import org.apache.activemq.store.kahadb.data.KahaEntryType;
 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
 import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
-import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.store.kahadb.disk.journal.Journal;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.util.DataByteArrayInputStream;
 import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.IOHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -186,6 +194,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         return inflightTransactions.remove(txid);
     }
 
+    @Override
     public void prepare(TransactionId txid) throws IOException {
         Tx tx = getTx(txid);
         for (TransactionStore store : tx.getStores()) {
@@ -193,6 +202,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         }
     }
 
+    @Override
     public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
             throws IOException {
 
@@ -247,6 +257,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         return location;
     }
 
+    @Override
     public void rollback(TransactionId txid) throws IOException {
         Tx tx = removeTx(txid);
         if (tx != null) {
@@ -256,6 +267,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         }
     }
 
+    @Override
     public void start() throws Exception {
         journal = new Journal() {
             @Override
@@ -289,6 +301,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore");
     }
 
+    @Override
     public void stop() throws Exception {
         journal.close();
         journal = null;
@@ -334,6 +347,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
     }
 
 
+    @Override
     public synchronized void recover(final TransactionRecoveryListener listener) throws IOException {
 
         for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
index 66ae496..45e35c6 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
@@ -22,12 +22,13 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTempQueue;
@@ -51,31 +52,35 @@ import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaDestination;
+import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
 import org.apache.activemq.store.kahadb.data.KahaLocation;
 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
-import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.store.kahadb.disk.journal.Location;
-import org.apache.activemq.store.kahadb.disk.page.Transaction;
 
 public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter, BrokerServiceAware {
 
     private final WireFormat wireFormat = new OpenWireFormat();
     private BrokerService brokerService;
 
+    @Override
     public void setBrokerName(String brokerName) {
     }
+    @Override
     public void setUsageManager(SystemUsage usageManager) {
     }
 
+    @Override
     public TransactionStore createTransactionStore() throws IOException {
         return new TransactionStore(){
-            
+
+            @Override
             public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
                 if (preCommit != null) {
                     preCommit.run();
@@ -85,18 +90,21 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
                     postCommit.run();
                 }
             }
+            @Override
             public void prepare(TransactionId txid) throws IOException {
-            	processPrepare(txid);
+                processPrepare(txid);
             }
+            @Override
             public void rollback(TransactionId txid) throws IOException {
-            	processRollback(txid);
+                processRollback(txid);
             }
+            @Override
             public void recover(TransactionRecoveryListener listener) throws IOException {
                 for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
                     XATransactionId xid = (XATransactionId)entry.getKey();
                     ArrayList<Message> messageList = new ArrayList<Message>();
                     ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
-                    
+
                     for (Operation op : entry.getValue()) {
                         if( op.getClass() == AddOpperation.class ) {
                             AddOpperation addOp = (AddOpperation)op;
@@ -108,7 +116,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
                             ackList.add(ack);
                         }
                     }
-                    
+
                     Message[] addedMessages = new Message[messageList.size()];
                     MessageAck[] acks = new MessageAck[ackList.size()];
                     messageList.toArray(addedMessages);
@@ -116,8 +124,10 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
                     listener.recover(xid, addedMessages, acks);
                 }
             }
+            @Override
             public void start() throws Exception {
             }
+            @Override
             public void stop() throws Exception {
             }
         };
@@ -136,13 +146,15 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             return destination;
         }
 
+        @Override
         public void addMessage(ConnectionContext context, Message message) throws IOException {
             KahaAddMessageCommand command = new KahaAddMessageCommand();
             command.setDestination(dest);
             command.setMessageId(message.getMessageId().toProducerKey());
             processAdd(command, message.getTransactionId(), wireFormat.marshal(message));
         }
-        
+
+        @Override
         public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
             KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
             command.setDestination(dest);
@@ -150,20 +162,23 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             processRemove(command, ack.getTransactionId());
         }
 
+        @Override
         public void removeAllMessages(ConnectionContext context) throws IOException {
             KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
             command.setDestination(dest);
             process(command);
         }
 
+        @Override
         public Message getMessage(MessageId identity) throws IOException {
             final String key = identity.toProducerKey();
-            
+
             // Hopefully one day the page file supports concurrent read operations... but for now we must
             // externally synchronize...
             ByteSequence data;
             synchronized(indexMutex) {
                 data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){
+                    @Override
                     public ByteSequence execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long sequence = sd.messageIdIndex.get(tx, key);
@@ -177,14 +192,16 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             if( data == null ) {
                 return null;
             }
-            
+
             Message msg = (Message)wireFormat.unmarshal( data );
-			return msg;
+            return msg;
         }
-        
+
+        @Override
         public int getMessageCount() throws IOException {
             synchronized(indexMutex) {
                 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
+                    @Override
                     public Integer execute(Transaction tx) throws IOException {
                         // Iterate through all index entries to get a count of messages in the destination.
                         StoredDestination sd = getStoredDestination(dest, tx);
@@ -199,9 +216,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             }
         }
 
+        @Override
         public void recover(final MessageRecoveryListener listener) throws Exception {
             synchronized(indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<Exception>(){
+                    @Override
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
@@ -214,10 +233,12 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
         }
 
         long cursorPos=0;
-        
+
+        @Override
         public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
             synchronized(indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<Exception>(){
+                    @Override
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Entry<Long, MessageRecord> entry=null;
@@ -238,20 +259,22 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             }
         }
 
+        @Override
         public void resetBatching() {
             cursorPos=0;
         }
 
-        
+
         @Override
         public void setBatch(MessageId identity) throws IOException {
             final String key = identity.toProducerKey();
-            
+
             // Hopefully one day the page file supports concurrent read operations... but for now we must
             // externally synchronize...
             Long location;
             synchronized(indexMutex) {
                 location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){
+                    @Override
                     public Long execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         return sd.messageIdIndex.get(tx, key);
@@ -261,7 +284,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             if( location!=null ) {
                 cursorPos=location+1;
             }
-            
+
         }
 
         @Override
@@ -273,14 +296,15 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
         @Override
         public void stop() throws Exception {
         }
-        
+
     }
-        
+
     class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
         public KahaDBTopicMessageStore(ActiveMQTopic destination) {
             super(destination);
         }
-        
+
+        @Override
         public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
                                 MessageId messageId, MessageAck ack) throws IOException {
             KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
@@ -294,6 +318,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             processRemove(command, null);
         }
 
+        @Override
         public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
             String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
             KahaSubscriptionCommand command = new KahaSubscriptionCommand();
@@ -305,6 +330,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             process(command);
         }
 
+        @Override
         public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
             KahaSubscriptionCommand command = new KahaSubscriptionCommand();
             command.setDestination(dest);
@@ -312,11 +338,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             process(command);
         }
 
+        @Override
         public SubscriptionInfo[] getAllSubscriptions() throws IOException {
-            
+
             final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
             synchronized(indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<IOException>(){
+                    @Override
                     public void execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
@@ -328,16 +356,18 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
                     }
                 });
             }
-            
+
             SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
             subscriptions.toArray(rc);
             return rc;
         }
 
+        @Override
         public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
             synchronized(indexMutex) {
                 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){
+                    @Override
                     public SubscriptionInfo execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
@@ -349,11 +379,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
                 });
             }
         }
-       
+
+        @Override
         public int getMessageCount(String clientId, String subscriptionName) throws IOException {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
             synchronized(indexMutex) {
                 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
+                    @Override
                     public Integer execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
@@ -362,7 +394,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
                             return 0;
                         }
                         cursorPos += 1;
-                        
+
                         int counter = 0;
                         for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
                             iterator.next();
@@ -371,18 +403,20 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
                         return counter;
                     }
                 });
-            }        
+            }
         }
 
+        @Override
         public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
             synchronized(indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<Exception>(){
+                    @Override
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
                         cursorPos += 1;
-                        
+
                         for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
                             Entry<Long, MessageRecord> entry = iterator.next();
                             listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
@@ -392,10 +426,12 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             }
         }
 
+        @Override
         public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
             synchronized(indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<Exception>(){
+                    @Override
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
@@ -403,7 +439,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
                             cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
                             cursorPos += 1;
                         }
-                        
+
                         Entry<Long, MessageRecord> entry=null;
                         int counter = 0;
                         for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
@@ -422,11 +458,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             }
         }
 
+        @Override
         public void resetBatching(String clientId, String subscriptionName) {
             try {
                 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
                 synchronized(indexMutex) {
                     pageFile.tx().execute(new Transaction.Closure<IOException>(){
+                        @Override
                         public void execute(Transaction tx) throws IOException {
                             StoredDestination sd = getStoredDestination(dest, tx);
                             sd.subscriptionCursors.remove(subscriptionKey);
@@ -442,11 +480,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
     String subscriptionKey(String clientId, String subscriptionName){
         return clientId+":"+subscriptionName;
     }
-    
+
+    @Override
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
         return new KahaDBMessageStore(destination);
     }
 
+    @Override
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
         return new KahaDBTopicMessageStore(destination);
     }
@@ -457,6 +497,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
      *
      * @param destination Destination to forget
      */
+    @Override
     public void removeQueueMessageStore(ActiveMQQueue destination) {
     }
 
@@ -466,18 +507,22 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
      *
      * @param destination Destination to forget
      */
+    @Override
     public void removeTopicMessageStore(ActiveMQTopic destination) {
     }
 
+    @Override
     public void deleteAllMessages() throws IOException {
     }
-    
-    
+
+
+    @Override
     public Set<ActiveMQDestination> getDestinations() {
         try {
             final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
             synchronized(indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<IOException>(){
+                    @Override
                     public void execute(Transaction tx) throws IOException {
                         for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) {
                             Entry<String, StoredDestination> entry = iterator.next();
@@ -491,11 +536,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             throw new RuntimeException(e);
         }
     }
-    
+
+    @Override
     public long getLastMessageBrokerSequenceId() throws IOException {
         return 0;
     }
-    
+
+    @Override
     public long size() {
         if ( !started.get() ) {
             return 0;
@@ -507,32 +554,36 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
         }
     }
 
+    @Override
     public void beginTransaction(ConnectionContext context) throws IOException {
         throw new IOException("Not yet implemented.");
     }
+    @Override
     public void commitTransaction(ConnectionContext context) throws IOException {
         throw new IOException("Not yet implemented.");
     }
+    @Override
     public void rollbackTransaction(ConnectionContext context) throws IOException {
         throw new IOException("Not yet implemented.");
     }
-    
+
+    @Override
     public void checkpoint(boolean sync) throws IOException {
-    }    
+    }
 
     ///////////////////////////////////////////////////////////////////
     // Internal conversion methods.
     ///////////////////////////////////////////////////////////////////
-    
 
-    
+
+
     KahaLocation convert(Location location) {
         KahaLocation rc = new KahaLocation();
         rc.setLogId(location.getDataFileId());
         rc.setOffset(location.getOffset());
         return rc;
     }
-    
+
     KahaDestination convert(ActiveMQDestination dest) {
         KahaDestination rc = new KahaDestination();
         rc.setName(dest.getPhysicalName());
@@ -561,7 +612,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
         }
         int type = Integer.parseInt(dest.substring(0, p));
         String name = dest.substring(p+1);
-        
+
         switch( KahaDestination.DestinationType.valueOf(type) ) {
         case QUEUE:
             return new ActiveMQQueue(name);
@@ -571,11 +622,12 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             return new ActiveMQTempQueue(name);
         case TEMP_TOPIC:
             return new ActiveMQTempTopic(name);
-        default:    
+        default:
             throw new IllegalArgumentException("Not in the valid destination format");
         }
     }
-    
+
+    @Override
     public long getLastProducerSequenceId(ProducerId id) {
         return -1;
     }
@@ -592,4 +644,8 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
         }
         super.load();
     }
+    @Override
+    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
index be4f2ff..43fc152 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
@@ -20,11 +20,16 @@ import java.io.IOException;
 
 import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
+import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand;
 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
+import org.apache.activemq.store.kahadb.data.KahaDestroySchedulerCommand;
 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
 import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
+import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand;
+import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand;
+import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand;
 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
 import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
@@ -62,6 +67,21 @@ public class Visitor {
     public void visit(KahaAckMessageFileMapCommand kahaProducerAuditCommand) throws IOException {
     }
 
+    public void visit(KahaAddScheduledJobCommand kahaAddScheduledJobCommand) throws IOException {
+    }
+
+    public void visit(KahaRescheduleJobCommand KahaRescheduleJobCommand) throws IOException {
+    }
+
+    public void visit(KahaRemoveScheduledJobCommand kahaRemoveScheduledJobCommand) throws IOException {
+    }
+
+    public void visit(KahaRemoveScheduledJobsCommand kahaRemoveScheduledJobsCommand) throws IOException {
+    }
+
+    public void visit(KahaDestroySchedulerCommand KahaDestroySchedulerCommand) throws IOException {
+    }
+
     public void visit(KahaUpdateMessageCommand kahaUpdateMessageCommand) throws IOException {
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
index 86b9fa3..217bc1f 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
@@ -25,8 +25,8 @@ public class JobImpl implements Job {
     private final JobLocation jobLocation;
     private final byte[] payload;
 
-    protected JobImpl(JobLocation location,ByteSequence bs) {
-        this.jobLocation=location;
+    protected JobImpl(JobLocation location, ByteSequence bs) {
+        this.jobLocation = location;
         this.payload = new byte[bs.getLength()];
         System.arraycopy(bs.getData(), bs.getOffset(), this.payload, 0, bs.getLength());
     }
@@ -38,22 +38,22 @@ public class JobImpl implements Job {
 
     @Override
     public byte[] getPayload() {
-       return this.payload;
+        return this.payload;
     }
 
     @Override
     public long getPeriod() {
-       return this.jobLocation.getPeriod();
+        return this.jobLocation.getPeriod();
     }
 
     @Override
     public int getRepeat() {
-       return this.jobLocation.getRepeat();
+        return this.jobLocation.getRepeat();
     }
 
     @Override
     public long getStart() {
-       return this.jobLocation.getStartTime();
+        return this.jobLocation.getStartTime();
     }
 
     @Override
@@ -76,4 +76,13 @@ public class JobImpl implements Job {
         return JobSupport.getDateTime(getStart());
     }
 
+    @Override
+    public int getExecutionCount() {
+        return this.jobLocation.getRescheduledCount();
+    }
+
+    @Override
+    public String toString() {
+        return "Job: " + getJobId();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
index 13cf376..cb66145 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
@@ -36,6 +36,8 @@ class JobLocation {
     private long period;
     private String cronEntry;
     private final Location location;
+    private int rescheduledCount;
+    private Location lastUpdate;
 
     public JobLocation(Location location) {
         this.location = location;
@@ -52,8 +54,12 @@ class JobLocation {
         this.delay = in.readLong();
         this.nextTime = in.readLong();
         this.period = in.readLong();
-        this.cronEntry=in.readUTF();
+        this.cronEntry = in.readUTF();
         this.location.readExternal(in);
+        if (in.readBoolean()) {
+            this.lastUpdate = new Location();
+            this.lastUpdate.readExternal(in);
+        }
     }
 
     public void writeExternal(DataOutput out) throws IOException {
@@ -63,11 +69,17 @@ class JobLocation {
         out.writeLong(this.delay);
         out.writeLong(this.nextTime);
         out.writeLong(this.period);
-        if (this.cronEntry==null) {
-            this.cronEntry="";
+        if (this.cronEntry == null) {
+            this.cronEntry = "";
         }
         out.writeUTF(this.cronEntry);
         this.location.writeExternal(out);
+        if (lastUpdate != null) {
+            out.writeBoolean(true);
+            this.lastUpdate.writeExternal(out);
+        } else {
+            out.writeBoolean(false);
+        }
     }
 
     /**
@@ -123,7 +135,8 @@ class JobLocation {
     }
 
     /**
-     * @param nextTime the nextTime to set
+     * @param nextTime
+     *            the nextTime to set
      */
     public synchronized void setNextTime(long nextTime) {
         this.nextTime = nextTime;
@@ -152,7 +165,8 @@ class JobLocation {
     }
 
     /**
-     * @param cronEntry the cronEntry to set
+     * @param cronEntry
+     *            the cronEntry to set
      */
     public synchronized void setCronEntry(String cronEntry) {
         this.cronEntry = cronEntry;
@@ -173,7 +187,8 @@ class JobLocation {
     }
 
     /**
-     * @param delay the delay to set
+     * @param delay
+     *            the delay to set
      */
     public void setDelay(long delay) {
         this.delay = delay;
@@ -186,15 +201,55 @@ class JobLocation {
         return this.location;
     }
 
+    /**
+     * @returns the location in the journal of the last update issued for this
+     *          Job.
+     */
+    public Location getLastUpdate() {
+        return this.lastUpdate;
+    }
+
+    /**
+     * Sets the location of the last update command written to the journal for
+     * this Job. The update commands set the next execution time for this job.
+     * We need to keep track of only the latest update as it's the only one we
+     * really need to recover the correct state from the journal.
+     *
+     * @param location
+     *            The location in the journal of the last update command.
+     */
+    public void setLastUpdate(Location location) {
+        this.lastUpdate = location;
+    }
+
+    /**
+     * @return the number of time this job has been rescheduled.
+     */
+    public int getRescheduledCount() {
+        return rescheduledCount;
+    }
+
+    /**
+     * Sets the number of time this job has been rescheduled.  A newly added job will return
+     * zero and increment this value each time a scheduled message is dispatched to its
+     * target destination and the job is rescheduled for another cycle.
+     *
+     * @param executionCount
+     *        the new execution count to assign the JobLocation.
+     */
+    public void setRescheduledCount(int rescheduledCount) {
+        this.rescheduledCount = rescheduledCount;
+    }
+
     @Override
     public String toString() {
-        return "Job [id=" + jobId + ", startTime=" + new Date(startTime)
-                + ", delay=" + delay + ", period=" + period + ", repeat="
-                + repeat + ", nextTime=" + new Date(nextTime) + "]";
+        return "Job [id=" + jobId + ", startTime=" + new Date(startTime) + ", delay=" + delay + ", period=" + period + ", repeat=" + repeat + ", nextTime="
+            + new Date(nextTime) + ", executionCount = " + (rescheduledCount + 1) + "]";
     }
 
     static class JobLocationMarshaller extends VariableMarshaller<List<JobLocation>> {
         static final JobLocationMarshaller INSTANCE = new JobLocationMarshaller();
+
         @Override
         public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
             List<JobLocation> result = new ArrayList<JobLocation>();
@@ -228,6 +283,7 @@ class JobLocation {
         result = prime * result + (int) (period ^ (period >>> 32));
         result = prime * result + repeat;
         result = prime * result + (int) (startTime ^ (startTime >>> 32));
+        result = prime * result + (rescheduledCount ^ (rescheduledCount >>> 32));
         return result;
     }
 
@@ -286,6 +342,9 @@ class JobLocation {
         if (startTime != other.startTime) {
             return false;
         }
+        if (rescheduledCount != other.rescheduledCount) {
+            return false;
+        }
 
         return true;
     }


Mime
View raw message