activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject [4/5] activemq git commit: Revert "https://issues.apache.org/jira/browse/AMQ-3758"
Date Thu, 25 Dec 2014 04:02:26 GMT
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
deleted file mode 100644
index 6003c87..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
+++ /dev/null
@@ -1,745 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.store.kahadb;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Date;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.activemq.broker.LockableServiceSupport;
-import org.apache.activemq.broker.Locker;
-import org.apache.activemq.store.SharedFileLocker;
-import org.apache.activemq.store.kahadb.data.KahaEntryType;
-import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
-import org.apache.activemq.store.kahadb.disk.journal.Journal;
-import org.apache.activemq.store.kahadb.disk.journal.Location;
-import org.apache.activemq.store.kahadb.disk.page.PageFile;
-import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.util.DataByteArrayOutputStream;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.ServiceStopper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class AbstractKahaDBStore extends LockableServiceSupport {
-
-    static final Logger LOG = LoggerFactory.getLogger(AbstractKahaDBStore.class);
-
-    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
-    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
-
-    protected File directory;
-    protected PageFile pageFile;
-    protected Journal journal;
-    protected AtomicLong journalSize = new AtomicLong(0);
-    protected boolean failIfDatabaseIsLocked;
-    protected long checkpointInterval = 5*1000;
-    protected long cleanupInterval = 30*1000;
-    protected boolean checkForCorruptJournalFiles = false;
-    protected boolean checksumJournalFiles = true;
-    protected boolean forceRecoverIndex = false;
-    protected int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
-    protected int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
-    protected boolean archiveCorruptedIndex = false;
-    protected boolean enableIndexWriteAsync = false;
-    protected boolean enableJournalDiskSyncs = false;
-    protected boolean deleteAllJobs = false;
-    protected int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
-    protected boolean useIndexLFRUEviction = false;
-    protected float indexLFUEvictionFactor = 0.2f;
-    protected boolean ignoreMissingJournalfiles = false;
-    protected int indexCacheSize = 1000;
-    protected boolean enableIndexDiskSyncs = true;
-    protected boolean enableIndexRecoveryFile = true;
-    protected boolean enableIndexPageCaching = true;
-    protected boolean archiveDataLogs;
-    protected boolean purgeStoreOnStartup;
-    protected File directoryArchive;
-
-    protected AtomicBoolean opened = new AtomicBoolean();
-    protected Thread checkpointThread;
-    protected final Object checkpointThreadLock = new Object();
-    protected ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
-    protected ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
-
-    /**
-     * @return the name to give this store's PageFile instance.
-     */
-    protected abstract String getPageFileName();
-
-    /**
-     * @return the location of the data directory if no set by configuration.
-     */
-    protected abstract File getDefaultDataDirectory();
-
-    /**
-     * Loads the store from disk.
-     *
-     * Based on configuration this method can either load an existing store or it can purge
-     * an existing store and start in a clean state.
-     *
-     * @throws IOException if an error occurs during the load.
-     */
-    public abstract void load() throws IOException;
-
-    /**
-     * Unload the state of the Store to disk and shuts down all resources assigned to this
-     * KahaDB store implementation.
-     *
-     * @throws IOException if an error occurs during the store unload.
-     */
-    public abstract void unload() throws IOException;
-
-    @Override
-    protected void doStart() throws Exception {
-        this.indexLock.writeLock().lock();
-        if (getDirectory() == null) {
-            setDirectory(getDefaultDataDirectory());
-        }
-        IOHelper.mkdirs(getDirectory());
-        try {
-            if (isPurgeStoreOnStartup()) {
-                getJournal().start();
-                getJournal().delete();
-                getJournal().close();
-                journal = null;
-                getPageFile().delete();
-                LOG.info("{} Persistence store purged.", this);
-                setPurgeStoreOnStartup(false);
-            }
-
-            load();
-            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
-        } finally {
-            this.indexLock.writeLock().unlock();
-        }
-    }
-
-    @Override
-    protected void doStop(ServiceStopper stopper) throws Exception {
-        unload();
-    }
-
-    public PageFile getPageFile() {
-        if (pageFile == null) {
-            pageFile = createPageFile();
-        }
-        return pageFile;
-    }
-
-    public Journal getJournal() throws IOException {
-        if (journal == null) {
-            journal = createJournal();
-        }
-        return journal;
-    }
-
-    public File getDirectory() {
-        return directory;
-    }
-
-    public void setDirectory(File directory) {
-        this.directory = directory;
-    }
-
-    public boolean isArchiveCorruptedIndex() {
-        return archiveCorruptedIndex;
-    }
-
-    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
-        this.archiveCorruptedIndex = archiveCorruptedIndex;
-    }
-
-    public boolean isFailIfDatabaseIsLocked() {
-        return failIfDatabaseIsLocked;
-    }
-
-    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
-        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
-    }
-
-    public boolean isCheckForCorruptJournalFiles() {
-        return checkForCorruptJournalFiles;
-    }
-
-    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
-        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
-    }
-
-    public long getCheckpointInterval() {
-        return checkpointInterval;
-    }
-
-    public void setCheckpointInterval(long checkpointInterval) {
-        this.checkpointInterval = checkpointInterval;
-    }
-
-    public long getCleanupInterval() {
-        return cleanupInterval;
-    }
-
-    public void setCleanupInterval(long cleanupInterval) {
-        this.cleanupInterval = cleanupInterval;
-    }
-
-    public boolean isChecksumJournalFiles() {
-        return checksumJournalFiles;
-    }
-
-    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
-        this.checksumJournalFiles = checksumJournalFiles;
-    }
-
-    public boolean isForceRecoverIndex() {
-        return forceRecoverIndex;
-    }
-
-    public void setForceRecoverIndex(boolean forceRecoverIndex) {
-        this.forceRecoverIndex = forceRecoverIndex;
-    }
-
-    public int getJournalMaxFileLength() {
-        return journalMaxFileLength;
-    }
-
-    public void setJournalMaxFileLength(int journalMaxFileLength) {
-        this.journalMaxFileLength = journalMaxFileLength;
-    }
-
-    public int getJournalMaxWriteBatchSize() {
-        return journalMaxWriteBatchSize;
-    }
-
-    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
-        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
-    }
-
-    public boolean isEnableIndexWriteAsync() {
-        return enableIndexWriteAsync;
-    }
-
-    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
-        this.enableIndexWriteAsync = enableIndexWriteAsync;
-    }
-
-    public boolean isEnableJournalDiskSyncs() {
-        return enableJournalDiskSyncs;
-    }
-
-    public void setEnableJournalDiskSyncs(boolean syncWrites) {
-        this.enableJournalDiskSyncs = syncWrites;
-    }
-
-    public boolean isDeleteAllJobs() {
-        return deleteAllJobs;
-    }
-
-    public void setDeleteAllJobs(boolean deleteAllJobs) {
-        this.deleteAllJobs = deleteAllJobs;
-    }
-
-    /**
-     * @return the archiveDataLogs
-     */
-    public boolean isArchiveDataLogs() {
-        return this.archiveDataLogs;
-    }
-
-    /**
-     * @param archiveDataLogs the archiveDataLogs to set
-     */
-    public void setArchiveDataLogs(boolean archiveDataLogs) {
-        this.archiveDataLogs = archiveDataLogs;
-    }
-
-    /**
-     * @return the directoryArchive
-     */
-    public File getDirectoryArchive() {
-        return this.directoryArchive;
-    }
-
-    /**
-     * @param directoryArchive the directoryArchive to set
-     */
-    public void setDirectoryArchive(File directoryArchive) {
-        this.directoryArchive = directoryArchive;
-    }
-
-    public int getIndexCacheSize() {
-        return indexCacheSize;
-    }
-
-    public void setIndexCacheSize(int indexCacheSize) {
-        this.indexCacheSize = indexCacheSize;
-    }
-
-    public int getIndexWriteBatchSize() {
-        return indexWriteBatchSize;
-    }
-
-    public void setIndexWriteBatchSize(int indexWriteBatchSize) {
-        this.indexWriteBatchSize = indexWriteBatchSize;
-    }
-
-    public boolean isUseIndexLFRUEviction() {
-        return useIndexLFRUEviction;
-    }
-
-    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
-        this.useIndexLFRUEviction = useIndexLFRUEviction;
-    }
-
-    public float getIndexLFUEvictionFactor() {
-        return indexLFUEvictionFactor;
-    }
-
-    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
-        this.indexLFUEvictionFactor = indexLFUEvictionFactor;
-    }
-
-    public boolean isEnableIndexDiskSyncs() {
-        return enableIndexDiskSyncs;
-    }
-
-    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
-        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
-    }
-
-    public boolean isEnableIndexRecoveryFile() {
-        return enableIndexRecoveryFile;
-    }
-
-    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
-        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
-    }
-
-    public boolean isEnableIndexPageCaching() {
-        return enableIndexPageCaching;
-    }
-
-    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
-        this.enableIndexPageCaching = enableIndexPageCaching;
-    }
-
-    public boolean isPurgeStoreOnStartup() {
-        return this.purgeStoreOnStartup;
-    }
-
-    public void setPurgeStoreOnStartup(boolean purge) {
-        this.purgeStoreOnStartup = purge;
-    }
-
-    public boolean isIgnoreMissingJournalfiles() {
-        return ignoreMissingJournalfiles;
-    }
-
-    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
-        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
-    }
-
-    public long size() {
-        if (!isStarted()) {
-            return 0;
-        }
-        try {
-            return journalSize.get() + pageFile.getDiskSize();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public Locker createDefaultLocker() throws IOException {
-        SharedFileLocker locker = new SharedFileLocker();
-        locker.setDirectory(this.getDirectory());
-        return locker;
-    }
-
-    @Override
-    public void init() throws Exception {
-    }
-
-    /**
-     * Store a command in the Journal and process to update the Store index.
-     *
-     * @param command
-     *      The specific JournalCommand to store and process.
-     *
-     * @returns the Location where the data was written in the Journal.
-     *
-     * @throws IOException if an error occurs storing or processing the command.
-     */
-    public Location store(JournalCommand<?> command) throws IOException {
-        return store(command, isEnableIndexDiskSyncs(), null, null, null);
-    }
-
-    /**
-     * Store a command in the Journal and process to update the Store index.
-     *
-     * @param command
-     *      The specific JournalCommand to store and process.
-     * @param sync
-     *      Should the store operation be done synchronously. (ignored if completion passed).
-     *
-     * @returns the Location where the data was written in the Journal.
-     *
-     * @throws IOException if an error occurs storing or processing the command.
-     */
-    public Location store(JournalCommand<?> command, boolean sync) throws IOException {
-        return store(command, sync, null, null, null);
-    }
-
-    /**
-     * Store a command in the Journal and process to update the Store index.
-     *
-     * @param command
-     *      The specific JournalCommand to store and process.
-     * @param onJournalStoreComplete
-     *      The Runnable to call when the Journal write operation completes.
-     *
-     * @returns the Location where the data was written in the Journal.
-     *
-     * @throws IOException if an error occurs storing or processing the command.
-     */
-    public Location store(JournalCommand<?> command, Runnable onJournalStoreComplete) throws IOException {
-        return store(command, isEnableIndexDiskSyncs(), null, null, onJournalStoreComplete);
-    }
-
-    /**
-     * Store a command in the Journal and process to update the Store index.
-     *
-     * @param command
-     *      The specific JournalCommand to store and process.
-     * @param sync
-     *      Should the store operation be done synchronously. (ignored if completion passed).
-     * @param before
-     *      The Runnable instance to execute before performing the store and process operation.
-     * @param after
-     *      The Runnable instance to execute after performing the store and process operation.
-     *
-     * @returns the Location where the data was written in the Journal.
-     *
-     * @throws IOException if an error occurs storing or processing the command.
-     */
-    public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after) throws IOException {
-        return store(command, sync, before, after, null);
-    }
-
-    /**
-     * All updated are are funneled through this method. The updates are converted to a
-     * JournalMessage which is logged to the journal and then the data from the JournalMessage
-     * is used to update the index just like it would be done during a recovery process.
-     *
-     * @param command
-     *      The specific JournalCommand to store and process.
-     * @param sync
-     *      Should the store operation be done synchronously. (ignored if completion passed).
-     * @param before
-     *      The Runnable instance to execute before performing the store and process operation.
-     * @param after
-     *      The Runnable instance to execute after performing the store and process operation.
-     * @param onJournalStoreComplete
-     *      Callback to be run when the journal write operation is complete.
-     *
-     * @returns the Location where the data was written in the Journal.
-     *
-     * @throws IOException if an error occurs storing or processing the command.
-     */
-    public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
-        try {
-
-            if (before != null) {
-                before.run();
-            }
-
-            ByteSequence sequence = toByteSequence(command);
-            Location location;
-            checkpointLock.readLock().lock();
-            try {
-
-                long start = System.currentTimeMillis();
-                location = onJournalStoreComplete == null ? journal.write(sequence, sync) :
-                                                            journal.write(sequence, onJournalStoreComplete);
-                long start2 = System.currentTimeMillis();
-
-                process(command, location);
-
-                long end = System.currentTimeMillis();
-                if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
-                    LOG.info("Slow KahaDB access: Journal append took: {} ms, Index update took {} ms",
-                             (start2-start), (end-start2));
-                }
-            } finally {
-                checkpointLock.readLock().unlock();
-            }
-
-            if (after != null) {
-                after.run();
-            }
-
-            if (checkpointThread != null && !checkpointThread.isAlive()) {
-                startCheckpoint();
-            }
-            return location;
-        } catch (IOException ioe) {
-            LOG.error("KahaDB failed to store to Journal", ioe);
-            if (brokerService != null) {
-                brokerService.handleIOException(ioe);
-            }
-            throw ioe;
-        }
-    }
-
-    /**
-     * Loads a previously stored JournalMessage
-     *
-     * @param location
-     *      The location of the journal command to read.
-     *
-     * @return a new un-marshaled JournalCommand instance.
-     *
-     * @throws IOException if an error occurs reading the stored command.
-     */
-    protected JournalCommand<?> load(Location location) throws IOException {
-        ByteSequence data = journal.read(location);
-        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
-        byte readByte = is.readByte();
-        KahaEntryType type = KahaEntryType.valueOf(readByte);
-        if (type == null) {
-            try {
-                is.close();
-            } catch (IOException e) {
-            }
-            throw new IOException("Could not load journal record. Invalid location: " + location);
-        }
-        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
-        message.mergeFramed(is);
-        return message;
-    }
-
-    /**
-     * Process a stored or recovered JournalCommand instance and update the DB Index with the
-     * state changes that this command produces.  This can be called either as a new DB operation
-     * or as a replay during recovery operations.
-     *
-     * @param command
-     *      The JournalCommand to process.
-     * @param location
-     *      The location in the Journal where the command was written or read from.
-     */
-    protected abstract void process(JournalCommand<?> command, Location location) throws IOException;
-
-    /**
-     * Perform a checkpoint operation with optional cleanup.
-     *
-     * Called by the checkpoint background thread periodically to initiate a checkpoint operation
-     * and if the cleanup flag is set a cleanup sweep should be done to allow for release of no
-     * longer needed journal log files etc.
-     *
-     * @param cleanup
-     *      Should the method do a simple checkpoint or also perform a journal cleanup.
-     *
-     * @throws IOException if an error occurs during the checkpoint operation.
-     */
-    protected void checkpointUpdate(final boolean cleanup) throws IOException {
-        checkpointLock.writeLock().lock();
-        try {
-            this.indexLock.writeLock().lock();
-            try {
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                    @Override
-                    public void execute(Transaction tx) throws IOException {
-                        checkpointUpdate(tx, cleanup);
-                    }
-                });
-            } finally {
-                this.indexLock.writeLock().unlock();
-            }
-
-        } finally {
-            checkpointLock.writeLock().unlock();
-        }
-    }
-
-    /**
-     * Perform the checkpoint update operation.  If the cleanup flag is true then the
-     * operation should also purge any unused Journal log files.
-     *
-     * This method must always be called with the checkpoint and index write locks held.
-     *
-     * @param tx
-     *      The TX under which to perform the checkpoint update.
-     * @param cleanup
-     *      Should the checkpoint also do unused Journal file cleanup.
-     *
-     * @throws IOException if an error occurs while performing the checkpoint.
-     */
-    protected abstract void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException;
-
-    /**
-     * Creates a new ByteSequence that represents the marshaled form of the given Journal Command.
-     *
-     * @param command
-     *      The Journal Command that should be marshaled to bytes for writing.
-     *
-     * @return the byte representation of the given journal command.
-     *
-     * @throws IOException if an error occurs while serializing the command.
-     */
-    protected ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
-        int size = data.serializedSizeFramed();
-        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
-        os.writeByte(data.type().getNumber());
-        data.writeFramed(os);
-        return os.toByteSequence();
-    }
-
-    /**
-     * Create the PageFile instance and configure it using the configuration options
-     * currently set.
-     *
-     * @return the newly created and configured PageFile instance.
-     */
-    protected PageFile createPageFile() {
-        PageFile index = new PageFile(getDirectory(), getPageFileName());
-        index.setEnableWriteThread(isEnableIndexWriteAsync());
-        index.setWriteBatchSize(getIndexWriteBatchSize());
-        index.setPageCacheSize(getIndexCacheSize());
-        index.setUseLFRUEviction(isUseIndexLFRUEviction());
-        index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
-        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
-        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
-        index.setEnablePageCaching(isEnableIndexPageCaching());
-        return index;
-    }
-
-    /**
-     * Create a new Journal instance and configure it using the currently set configuration
-     * options.  If an archive directory is configured than this method will attempt to create
-     * that directory if it does not already exist.
-     *
-     * @return the newly created an configured Journal instance.
-     *
-     * @throws IOException if an error occurs while creating the Journal object.
-     */
-    protected Journal createJournal() throws IOException {
-        Journal manager = new Journal();
-        manager.setDirectory(getDirectory());
-        manager.setMaxFileLength(getJournalMaxFileLength());
-        manager.setCheckForCorruptionOnStartup(isCheckForCorruptJournalFiles());
-        manager.setChecksum(isChecksumJournalFiles() || isCheckForCorruptJournalFiles());
-        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
-        manager.setArchiveDataLogs(isArchiveDataLogs());
-        manager.setSizeAccumulator(journalSize);
-        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
-        if (getDirectoryArchive() != null) {
-            IOHelper.mkdirs(getDirectoryArchive());
-            manager.setDirectoryArchive(getDirectoryArchive());
-        }
-        return manager;
-    }
-
-    /**
-     * Starts the checkpoint Thread instance if not already running and not disabled
-     * by configuration.
-     */
-    protected void startCheckpoint() {
-        if (checkpointInterval == 0 && cleanupInterval == 0) {
-            LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
-            return;
-        }
-        synchronized (checkpointThreadLock) {
-            boolean start = false;
-            if (checkpointThread == null) {
-                start = true;
-            } else if (!checkpointThread.isAlive()) {
-                start = true;
-                LOG.info("KahaDB: Recovering checkpoint thread after death");
-            }
-            if (start) {
-                checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
-                    @Override
-                    public void run() {
-                        try {
-                            long lastCleanup = System.currentTimeMillis();
-                            long lastCheckpoint = System.currentTimeMillis();
-                            // Sleep for a short time so we can periodically check
-                            // to see if we need to exit this thread.
-                            long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
-                            while (opened.get()) {
-                                Thread.sleep(sleepTime);
-                                long now = System.currentTimeMillis();
-                                if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
-                                    checkpointCleanup(true);
-                                    lastCleanup = now;
-                                    lastCheckpoint = now;
-                                } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
-                                    checkpointCleanup(false);
-                                    lastCheckpoint = now;
-                                }
-                            }
-                        } catch (InterruptedException e) {
-                            // Looks like someone really wants us to exit this thread...
-                        } catch (IOException ioe) {
-                            LOG.error("Checkpoint failed", ioe);
-                            brokerService.handleIOException(ioe);
-                        }
-                    }
-                };
-
-                checkpointThread.setDaemon(true);
-                checkpointThread.start();
-            }
-        }
-    }
-
-    /**
-     * Called from the worker thread to start a checkpoint.
-     *
-     * This method ensure that the store is in an opened state and optionaly logs information
-     * related to slow store access times.
-     *
-     * @param cleanup
-     *      Should a cleanup of the journal occur during the checkpoint operation.
-     *
-     * @throws IOException if an error occurs during the checkpoint operation.
-     */
-    protected void checkpointCleanup(final boolean cleanup) throws IOException {
-        long start;
-        this.indexLock.writeLock().lock();
-        try {
-            start = System.currentTimeMillis();
-            if (!opened.get()) {
-                return;
-            }
-        } finally {
-            this.indexLock.writeLock().unlock();
-        }
-        checkpointUpdate(cleanup);
-        long end = System.currentTimeMillis();
-        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
-            LOG.info("Slow KahaDB access: cleanup took {}", (end - start));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java
deleted file mode 100644
index defb238..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.activemq.store.kahadb.disk.journal.Location;
-import org.apache.activemq.store.kahadb.disk.page.Page;
-import org.apache.activemq.store.kahadb.disk.page.Transaction;
-
-/**
- * Interface for the store meta data used to hold the index value and other needed
- * information to manage a KahaDB store instance.
- */
-public interface KahaDBMetaData<T> {
-
-    /**
-     * Indicates that this meta data instance has been opened and is active.
-     */
-    public static final int OPEN_STATE = 2;
-
-    /**
-     * Indicates that this meta data instance has been closed and is no longer active.
-     */
-    public static final int CLOSED_STATE = 1;
-
-    /**
-     * Gets the Page in the store PageFile where the KahaDBMetaData instance is stored.
-     *
-     * @return the Page to use to start access the KahaDBMetaData instance.
-     */
-    Page<T> getPage();
-
-    /**
-     * Sets the Page instance used to load and store the KahaDBMetaData instance.
-     *
-     * @param page
-     *        the new Page value to use.
-     */
-    void setPage(Page<T> page);
-
-    /**
-     * Gets the state flag of this meta data instance.
-     *
-     *  @return the current state value for this instance.
-     */
-    int getState();
-
-    /**
-     * Sets the current value of the state flag.
-     *
-     * @param value
-     *        the new value to assign to the state flag.
-     */
-    void setState(int value);
-
-    /**
-     * Returns the Journal Location value that indicates that last recorded update
-     * that was successfully performed for this KahaDB store implementation.
-     *
-     * @return the location of the last successful update location.
-     */
-    Location getLastUpdateLocation();
-
-    /**
-     * Updates the value of the last successful update.
-     *
-     * @param location
-     *        the new value to assign the last update location field.
-     */
-    void setLastUpdateLocation(Location location);
-
-    /**
-     * For a newly created KahaDBMetaData instance this method is called to allow
-     * the instance to create all of it's internal indices and other state data.
-     *
-     * @param tx
-     *        the Transaction instance under which the operation is executed.
-     *
-     * @throws IOException if an error occurs while creating the meta data structures.
-     */
-    void initialize(Transaction tx) throws IOException;
-
-    /**
-     * Instructs this object to load its internal data structures from the KahaDB PageFile
-     * and prepare itself for use.
-     *
-     * @param tx
-     *        the Transaction instance under which the operation is executed.
-     *
-     * @throws IOException if an error occurs while creating the meta data structures.
-     */
-    void load(Transaction tx) throws IOException;
-
-    /**
-     * Reads the serialized for of this object from the KadaDB PageFile and prepares it
-     * for use.  This method does not need to perform a full load of the meta data structures
-     * only read in the information necessary to load them from the PageFile on a call to the
-     * load method.
-     *
-     * @param in
-     *        the DataInput instance used to read this objects serialized form.
-     *
-     * @throws IOException if an error occurs while reading the serialized form.
-     */
-    void read(DataInput in) throws IOException;
-
-    /**
-     * Writes the object into a serialized form which can be read back in again using the
-     * read method.
-     *
-     * @param out
-     *        the DataOutput instance to use to write the current state to a serialized form.
-     *
-     * @throws IOException if an error occurs while serializing this instance.
-     */
-    void write(DataOutput out) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index 9b83a0e..e199d68 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -31,7 +31,6 @@ import org.apache.activemq.broker.LockableServiceSupport;
 import org.apache.activemq.broker.Locker;
 import org.apache.activemq.broker.jmx.AnnotatedMBean;
 import org.apache.activemq.broker.jmx.PersistenceAdapterView;
-import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -40,14 +39,7 @@ import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.JournaledStore;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.SharedFileLocker;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionIdTransformer;
-import org.apache.activemq.store.TransactionIdTransformerAware;
-import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.*;
 import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
@@ -667,9 +659,4 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
     public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
         getStore().setTransactionIdTransformer(transactionIdTransformer);
     }
-
-    @Override
-    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
-        return this.letter.createJobSchedulerStore();
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 975cd05..60c0738 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -42,7 +42,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTempQueue;
@@ -56,14 +55,7 @@ import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.AbstractMessageStore;
-import org.apache.activemq.store.ListenableFuture;
-import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionIdTransformer;
-import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.*;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaDestination;
 import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
@@ -74,7 +66,6 @@ import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
 import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ServiceStopper;
@@ -181,7 +172,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
     public int getMaxAsyncJobs() {
         return this.maxAsyncJobs;
     }
-
     /**
      * @param maxAsyncJobs
      *            the maxAsyncJobs to set
@@ -436,7 +426,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
 
         }
 
-        @Override
         public void updateMessage(Message message) throws IOException {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
@@ -483,7 +472,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
             indexLock.writeLock().lock();
             try {
                 location = findMessageLocation(key, dest);
-            } finally {
+            }finally {
                 indexLock.writeLock().unlock();
             }
             if (location == null) {
@@ -503,17 +492,19 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         @Override
                         public Integer execute(Transaction tx) throws IOException {
                             // Iterate through all index entries to get a count
-                            // of messages in the destination.
+                            // of
+                            // messages in the destination.
                             StoredDestination sd = getStoredDestination(dest, tx);
                             int rc = 0;
-                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
+                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
+                                    .hasNext();) {
                                 iterator.next();
                                 rc++;
                             }
                             return rc;
                         }
                     });
-                } finally {
+                }finally {
                     indexLock.writeLock().unlock();
                 }
             } finally {
@@ -534,7 +525,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         return sd.locationIndex.isEmpty(tx);
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.writeLock().unlock();
             }
         }
@@ -561,11 +552,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         }
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.writeLock().unlock();
             }
         }
 
+
         @Override
         public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
             indexLock.writeLock().lock();
@@ -591,7 +583,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         sd.orderIndex.stoppedIterating();
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.writeLock().unlock();
             }
         }
@@ -636,7 +628,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         });
                 } catch (Exception e) {
                     LOG.error("Failed to reset batching",e);
-                } finally {
+                }finally {
                     indexLock.writeLock().unlock();
                 }
             }
@@ -649,7 +641,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                 lockAsyncJobQueue();
 
                 // Hopefully one day the page file supports concurrent read
-                // operations... but for now we must externally synchronize...
+                // operations... but for now we must
+                // externally synchronize...
 
                 indexLock.writeLock().lock();
                 try {
@@ -732,7 +725,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
 
         @Override
         public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
-                                MessageId messageId, MessageAck ack) throws IOException {
+                                MessageId messageId, MessageAck ack)
+                throws IOException {
             String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
             if (isConcurrentStoreAndDispatchTopics()) {
                 AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
@@ -816,7 +810,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         }
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.writeLock().unlock();
             }
 
@@ -842,7 +836,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                                 .getSubscriptionInfo().newInput()));
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.writeLock().unlock();
             }
         }
@@ -865,7 +859,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         return (int) getStoredMessageCount(tx, sd, subscriptionKey);
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.writeLock().unlock();
             }
         }
@@ -896,7 +890,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         sd.orderIndex.resetCursorPosition();
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.writeLock().unlock();
             }
         }
@@ -949,7 +943,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                         }
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.writeLock().unlock();
             }
         }
@@ -1364,6 +1358,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
                     LOG.warn("Failed to aquire lock", e);
                 }
             }
+
         }
 
         @Override
@@ -1427,11 +1422,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
             if (runnable instanceof StoreTask) {
                ((StoreTask)runnable).releaseLocks();
             }
-        }
-    }
 
-    @Override
-    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
-        return new JobSchedulerStoreImpl();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index eca83e8..d10c4eb 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -16,44 +16,12 @@
  */
 package org.apache.activemq.store.kahadb;
 
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.transaction.xa.Xid;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerServiceAware;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.Lockable;
-import org.apache.activemq.broker.LockableServiceSupport;
-import org.apache.activemq.broker.Locker;
-import org.apache.activemq.broker.scheduler.JobSchedulerStore;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.LocalTransactionId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.broker.*;
+import org.apache.activemq.command.*;
 import org.apache.activemq.filter.AnyDestination;
 import org.apache.activemq.filter.DestinationMap;
 import org.apache.activemq.filter.DestinationMapEntry;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.SharedFileLocker;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionIdTransformer;
-import org.apache.activemq.store.TransactionIdTransformerAware;
-import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.store.*;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IOHelper;
@@ -62,6 +30,13 @@ import org.apache.activemq.util.ServiceStopper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.*;
+
 /**
  * An implementation of {@link org.apache.activemq.store.PersistenceAdapter}  that supports
  * distribution of destinations across multiple kahaDB persistence adapters
@@ -75,7 +50,6 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
     final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
 
     final class DelegateDestinationMap extends DestinationMap {
-        @Override
         public void setEntries(List<DestinationMapEntry>  entries) {
             super.setEntries(entries);
         }
@@ -278,7 +252,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
         }
         if (adapter instanceof PersistenceAdapter) {
             adapter.removeQueueMessageStore(destination);
-            removeMessageStore(adapter, destination);
+            removeMessageStore((PersistenceAdapter)adapter, destination);
             destinationMap.removeAll(destination);
         }
     }
@@ -293,7 +267,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
         }
         if (adapter instanceof PersistenceAdapter) {
             adapter.removeTopicMessageStore(destination);
-            removeMessageStore(adapter, destination);
+            removeMessageStore((PersistenceAdapter)adapter, destination);
             destinationMap.removeAll(destination);
         }
     }
@@ -479,7 +453,6 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
         }
     }
 
-    @Override
     public BrokerService getBrokerService() {
         return brokerService;
     }
@@ -530,9 +503,4 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
         locker.configure(this);
         return locker;
     }
-
-    @Override
-    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
-        return new JobSchedulerStoreImpl();
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index 8840a1d..c7ece83 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -31,24 +31,16 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.store.AbstractMessageStore;
-import org.apache.activemq.store.ListenableFuture;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.ProxyMessageStore;
-import org.apache.activemq.store.ProxyTopicMessageStore;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionRecoveryListener;
-import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.*;
 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
 import org.apache.activemq.store.kahadb.data.KahaEntryType;
 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
 import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
+import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.store.kahadb.disk.journal.Journal;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.util.DataByteArrayInputStream;
 import org.apache.activemq.util.DataByteArrayOutputStream;
-import org.apache.activemq.util.IOHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -194,7 +186,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         return inflightTransactions.remove(txid);
     }
 
-    @Override
     public void prepare(TransactionId txid) throws IOException {
         Tx tx = getTx(txid);
         for (TransactionStore store : tx.getStores()) {
@@ -202,7 +193,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         }
     }
 
-    @Override
     public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
             throws IOException {
 
@@ -257,7 +247,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         return location;
     }
 
-    @Override
     public void rollback(TransactionId txid) throws IOException {
         Tx tx = removeTx(txid);
         if (tx != null) {
@@ -267,7 +256,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         }
     }
 
-    @Override
     public void start() throws Exception {
         journal = new Journal() {
             @Override
@@ -301,7 +289,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore");
     }
 
-    @Override
     public void stop() throws Exception {
         journal.close();
         journal = null;
@@ -347,7 +334,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
     }
 
 
-    @Override
     public synchronized void recover(final TransactionRecoveryListener listener) throws IOException {
 
         for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
index 45e35c6..66ae496 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
@@ -22,13 +22,12 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
+import java.util.Map.Entry;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTempQueue;
@@ -52,35 +51,31 @@ import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaDestination;
-import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
 import org.apache.activemq.store.kahadb.data.KahaLocation;
 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
-import org.apache.activemq.store.kahadb.disk.journal.Location;
-import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
 
 public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter, BrokerServiceAware {
 
     private final WireFormat wireFormat = new OpenWireFormat();
     private BrokerService brokerService;
 
-    @Override
     public void setBrokerName(String brokerName) {
     }
-    @Override
     public void setUsageManager(SystemUsage usageManager) {
     }
 
-    @Override
     public TransactionStore createTransactionStore() throws IOException {
         return new TransactionStore(){
-
-            @Override
+            
             public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
                 if (preCommit != null) {
                     preCommit.run();
@@ -90,21 +85,18 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
                     postCommit.run();
                 }
             }
-            @Override
             public void prepare(TransactionId txid) throws IOException {
-                processPrepare(txid);
+            	processPrepare(txid);
             }
-            @Override
             public void rollback(TransactionId txid) throws IOException {
-                processRollback(txid);
+            	processRollback(txid);
             }
-            @Override
             public void recover(TransactionRecoveryListener listener) throws IOException {
                 for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
                     XATransactionId xid = (XATransactionId)entry.getKey();
                     ArrayList<Message> messageList = new ArrayList<Message>();
                     ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
-
+                    
                     for (Operation op : entry.getValue()) {
                         if( op.getClass() == AddOpperation.class ) {
                             AddOpperation addOp = (AddOpperation)op;
@@ -116,7 +108,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
                             ackList.add(ack);
                         }
                     }
-
+                    
                     Message[] addedMessages = new Message[messageList.size()];
                     MessageAck[] acks = new MessageAck[ackList.size()];
                     messageList.toArray(addedMessages);
@@ -124,10 +116,8 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
                     listener.recover(xid, addedMessages, acks);
                 }
             }
-            @Override
             public void start() throws Exception {
             }
-            @Override
             public void stop() throws Exception {
             }
         };
@@ -146,15 +136,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             return destination;
         }
 
-        @Override
         public void addMessage(ConnectionContext context, Message message) throws IOException {
             KahaAddMessageCommand command = new KahaAddMessageCommand();
             command.setDestination(dest);
             command.setMessageId(message.getMessageId().toProducerKey());
             processAdd(command, message.getTransactionId(), wireFormat.marshal(message));
         }
-
-        @Override
+        
         public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
             KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
             command.setDestination(dest);
@@ -162,23 +150,20 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             processRemove(command, ack.getTransactionId());
         }
 
-        @Override
         public void removeAllMessages(ConnectionContext context) throws IOException {
             KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
             command.setDestination(dest);
             process(command);
         }
 
-        @Override
         public Message getMessage(MessageId identity) throws IOException {
             final String key = identity.toProducerKey();
-
+            
             // Hopefully one day the page file supports concurrent read operations... but for now we must
             // externally synchronize...
             ByteSequence data;
             synchronized(indexMutex) {
                 data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){
-                    @Override
                     public ByteSequence execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long sequence = sd.messageIdIndex.get(tx, key);
@@ -192,16 +177,14 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             if( data == null ) {
                 return null;
             }
-
+            
             Message msg = (Message)wireFormat.unmarshal( data );
-            return msg;
+			return msg;
         }
-
-        @Override
+        
         public int getMessageCount() throws IOException {
             synchronized(indexMutex) {
                 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
-                    @Override
                     public Integer execute(Transaction tx) throws IOException {
                         // Iterate through all index entries to get a count of messages in the destination.
                         StoredDestination sd = getStoredDestination(dest, tx);
@@ -216,11 +199,9 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             }
         }
 
-        @Override
         public void recover(final MessageRecoveryListener listener) throws Exception {
             synchronized(indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<Exception>(){
-                    @Override
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
@@ -233,12 +214,10 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
         }
 
         long cursorPos=0;
-
-        @Override
+        
         public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
             synchronized(indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<Exception>(){
-                    @Override
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Entry<Long, MessageRecord> entry=null;
@@ -259,22 +238,20 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             }
         }
 
-        @Override
         public void resetBatching() {
             cursorPos=0;
         }
 
-
+        
         @Override
         public void setBatch(MessageId identity) throws IOException {
             final String key = identity.toProducerKey();
-
+            
             // Hopefully one day the page file supports concurrent read operations... but for now we must
             // externally synchronize...
             Long location;
             synchronized(indexMutex) {
                 location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){
-                    @Override
                     public Long execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         return sd.messageIdIndex.get(tx, key);
@@ -284,7 +261,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             if( location!=null ) {
                 cursorPos=location+1;
             }
-
+            
         }
 
         @Override
@@ -296,15 +273,14 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
         @Override
         public void stop() throws Exception {
         }
-
+        
     }
-
+        
     class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
         public KahaDBTopicMessageStore(ActiveMQTopic destination) {
             super(destination);
         }
-
-        @Override
+        
         public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
                                 MessageId messageId, MessageAck ack) throws IOException {
             KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
@@ -318,7 +294,6 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             processRemove(command, null);
         }
 
-        @Override
         public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
             String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
             KahaSubscriptionCommand command = new KahaSubscriptionCommand();
@@ -330,7 +305,6 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             process(command);
         }
 
-        @Override
         public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
             KahaSubscriptionCommand command = new KahaSubscriptionCommand();
             command.setDestination(dest);
@@ -338,13 +312,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             process(command);
         }
 
-        @Override
         public SubscriptionInfo[] getAllSubscriptions() throws IOException {
-
+            
             final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
             synchronized(indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<IOException>(){
-                    @Override
                     public void execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
@@ -356,18 +328,16 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
                     }
                 });
             }
-
+            
             SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
             subscriptions.toArray(rc);
             return rc;
         }
 
-        @Override
         public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
             synchronized(indexMutex) {
                 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){
-                    @Override
                     public SubscriptionInfo execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
@@ -379,13 +349,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
                 });
             }
         }
-
-        @Override
+       
         public int getMessageCount(String clientId, String subscriptionName) throws IOException {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
             synchronized(indexMutex) {
                 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
-                    @Override
                     public Integer execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
@@ -394,7 +362,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
                             return 0;
                         }
                         cursorPos += 1;
-
+                        
                         int counter = 0;
                         for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
                             iterator.next();
@@ -403,20 +371,18 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
                         return counter;
                     }
                 });
-            }
+            }        
         }
 
-        @Override
         public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
             synchronized(indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<Exception>(){
-                    @Override
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
                         cursorPos += 1;
-
+                        
                         for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
                             Entry<Long, MessageRecord> entry = iterator.next();
                             listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
@@ -426,12 +392,10 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             }
         }
 
-        @Override
         public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
             synchronized(indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<Exception>(){
-                    @Override
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
@@ -439,7 +403,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
                             cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
                             cursorPos += 1;
                         }
-
+                        
                         Entry<Long, MessageRecord> entry=null;
                         int counter = 0;
                         for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
@@ -458,13 +422,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             }
         }
 
-        @Override
         public void resetBatching(String clientId, String subscriptionName) {
             try {
                 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
                 synchronized(indexMutex) {
                     pageFile.tx().execute(new Transaction.Closure<IOException>(){
-                        @Override
                         public void execute(Transaction tx) throws IOException {
                             StoredDestination sd = getStoredDestination(dest, tx);
                             sd.subscriptionCursors.remove(subscriptionKey);
@@ -480,13 +442,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
     String subscriptionKey(String clientId, String subscriptionName){
         return clientId+":"+subscriptionName;
     }
-
-    @Override
+    
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
         return new KahaDBMessageStore(destination);
     }
 
-    @Override
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
         return new KahaDBTopicMessageStore(destination);
     }
@@ -497,7 +457,6 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
      *
      * @param destination Destination to forget
      */
-    @Override
     public void removeQueueMessageStore(ActiveMQQueue destination) {
     }
 
@@ -507,22 +466,18 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
      *
      * @param destination Destination to forget
      */
-    @Override
     public void removeTopicMessageStore(ActiveMQTopic destination) {
     }
 
-    @Override
     public void deleteAllMessages() throws IOException {
     }
-
-
-    @Override
+    
+    
     public Set<ActiveMQDestination> getDestinations() {
         try {
             final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
             synchronized(indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<IOException>(){
-                    @Override
                     public void execute(Transaction tx) throws IOException {
                         for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) {
                             Entry<String, StoredDestination> entry = iterator.next();
@@ -536,13 +491,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             throw new RuntimeException(e);
         }
     }
-
-    @Override
+    
     public long getLastMessageBrokerSequenceId() throws IOException {
         return 0;
     }
-
-    @Override
+    
     public long size() {
         if ( !started.get() ) {
             return 0;
@@ -554,36 +507,32 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
         }
     }
 
-    @Override
     public void beginTransaction(ConnectionContext context) throws IOException {
         throw new IOException("Not yet implemented.");
     }
-    @Override
     public void commitTransaction(ConnectionContext context) throws IOException {
         throw new IOException("Not yet implemented.");
     }
-    @Override
     public void rollbackTransaction(ConnectionContext context) throws IOException {
         throw new IOException("Not yet implemented.");
     }
-
-    @Override
+    
     public void checkpoint(boolean sync) throws IOException {
-    }
+    }    
 
     ///////////////////////////////////////////////////////////////////
     // Internal conversion methods.
     ///////////////////////////////////////////////////////////////////
+    
 
-
-
+    
     KahaLocation convert(Location location) {
         KahaLocation rc = new KahaLocation();
         rc.setLogId(location.getDataFileId());
         rc.setOffset(location.getOffset());
         return rc;
     }
-
+    
     KahaDestination convert(ActiveMQDestination dest) {
         KahaDestination rc = new KahaDestination();
         rc.setName(dest.getPhysicalName());
@@ -612,7 +561,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
         }
         int type = Integer.parseInt(dest.substring(0, p));
         String name = dest.substring(p+1);
-
+        
         switch( KahaDestination.DestinationType.valueOf(type) ) {
         case QUEUE:
             return new ActiveMQQueue(name);
@@ -622,12 +571,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
             return new ActiveMQTempQueue(name);
         case TEMP_TOPIC:
             return new ActiveMQTempTopic(name);
-        default:
+        default:    
             throw new IllegalArgumentException("Not in the valid destination format");
         }
     }
-
-    @Override
+    
     public long getLastProducerSequenceId(ProducerId id) {
         return -1;
     }
@@ -644,8 +592,4 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
         }
         super.load();
     }
-    @Override
-    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
-        throw new UnsupportedOperationException();
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
index 43fc152..be4f2ff 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
@@ -20,16 +20,11 @@ import java.io.IOException;
 
 import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
-import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand;
 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
-import org.apache.activemq.store.kahadb.data.KahaDestroySchedulerCommand;
 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
 import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand;
-import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand;
 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
 import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
@@ -67,21 +62,6 @@ public class Visitor {
     public void visit(KahaAckMessageFileMapCommand kahaProducerAuditCommand) throws IOException {
     }
 
-    public void visit(KahaAddScheduledJobCommand kahaAddScheduledJobCommand) throws IOException {
-    }
-
-    public void visit(KahaRescheduleJobCommand KahaRescheduleJobCommand) throws IOException {
-    }
-
-    public void visit(KahaRemoveScheduledJobCommand kahaRemoveScheduledJobCommand) throws IOException {
-    }
-
-    public void visit(KahaRemoveScheduledJobsCommand kahaRemoveScheduledJobsCommand) throws IOException {
-    }
-
-    public void visit(KahaDestroySchedulerCommand KahaDestroySchedulerCommand) throws IOException {
-    }
-
     public void visit(KahaUpdateMessageCommand kahaUpdateMessageCommand) throws IOException {
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
index 217bc1f..86b9fa3 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
@@ -25,8 +25,8 @@ public class JobImpl implements Job {
     private final JobLocation jobLocation;
     private final byte[] payload;
 
-    protected JobImpl(JobLocation location, ByteSequence bs) {
-        this.jobLocation = location;
+    protected JobImpl(JobLocation location,ByteSequence bs) {
+        this.jobLocation=location;
         this.payload = new byte[bs.getLength()];
         System.arraycopy(bs.getData(), bs.getOffset(), this.payload, 0, bs.getLength());
     }
@@ -38,22 +38,22 @@ public class JobImpl implements Job {
 
     @Override
     public byte[] getPayload() {
-        return this.payload;
+       return this.payload;
     }
 
     @Override
     public long getPeriod() {
-        return this.jobLocation.getPeriod();
+       return this.jobLocation.getPeriod();
     }
 
     @Override
     public int getRepeat() {
-        return this.jobLocation.getRepeat();
+       return this.jobLocation.getRepeat();
     }
 
     @Override
     public long getStart() {
-        return this.jobLocation.getStartTime();
+       return this.jobLocation.getStartTime();
     }
 
     @Override
@@ -76,13 +76,4 @@ public class JobImpl implements Job {
         return JobSupport.getDateTime(getStart());
     }
 
-    @Override
-    public int getExecutionCount() {
-        return this.jobLocation.getRescheduledCount();
-    }
-
-    @Override
-    public String toString() {
-        return "Job: " + getJobId();
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
index cb66145..13cf376 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
@@ -36,8 +36,6 @@ class JobLocation {
     private long period;
     private String cronEntry;
     private final Location location;
-    private int rescheduledCount;
-    private Location lastUpdate;
 
     public JobLocation(Location location) {
         this.location = location;
@@ -54,12 +52,8 @@ class JobLocation {
         this.delay = in.readLong();
         this.nextTime = in.readLong();
         this.period = in.readLong();
-        this.cronEntry = in.readUTF();
+        this.cronEntry=in.readUTF();
         this.location.readExternal(in);
-        if (in.readBoolean()) {
-            this.lastUpdate = new Location();
-            this.lastUpdate.readExternal(in);
-        }
     }
 
     public void writeExternal(DataOutput out) throws IOException {
@@ -69,17 +63,11 @@ class JobLocation {
         out.writeLong(this.delay);
         out.writeLong(this.nextTime);
         out.writeLong(this.period);
-        if (this.cronEntry == null) {
-            this.cronEntry = "";
+        if (this.cronEntry==null) {
+            this.cronEntry="";
         }
         out.writeUTF(this.cronEntry);
         this.location.writeExternal(out);
-        if (lastUpdate != null) {
-            out.writeBoolean(true);
-            this.lastUpdate.writeExternal(out);
-        } else {
-            out.writeBoolean(false);
-        }
     }
 
     /**
@@ -135,8 +123,7 @@ class JobLocation {
     }
 
     /**
-     * @param nextTime
-     *            the nextTime to set
+     * @param nextTime the nextTime to set
      */
     public synchronized void setNextTime(long nextTime) {
         this.nextTime = nextTime;
@@ -165,8 +152,7 @@ class JobLocation {
     }
 
     /**
-     * @param cronEntry
-     *            the cronEntry to set
+     * @param cronEntry the cronEntry to set
      */
     public synchronized void setCronEntry(String cronEntry) {
         this.cronEntry = cronEntry;
@@ -187,8 +173,7 @@ class JobLocation {
     }
 
     /**
-     * @param delay
-     *            the delay to set
+     * @param delay the delay to set
      */
     public void setDelay(long delay) {
         this.delay = delay;
@@ -201,55 +186,15 @@ class JobLocation {
         return this.location;
     }
 
-    /**
-     * @returns the location in the journal of the last update issued for this
-     *          Job.
-     */
-    public Location getLastUpdate() {
-        return this.lastUpdate;
-    }
-
-    /**
-     * Sets the location of the last update command written to the journal for
-     * this Job. The update commands set the next execution time for this job.
-     * We need to keep track of only the latest update as it's the only one we
-     * really need to recover the correct state from the journal.
-     *
-     * @param location
-     *            The location in the journal of the last update command.
-     */
-    public void setLastUpdate(Location location) {
-        this.lastUpdate = location;
-    }
-
-    /**
-     * @return the number of time this job has been rescheduled.
-     */
-    public int getRescheduledCount() {
-        return rescheduledCount;
-    }
-
-    /**
-     * Sets the number of time this job has been rescheduled.  A newly added job will return
-     * zero and increment this value each time a scheduled message is dispatched to its
-     * target destination and the job is rescheduled for another cycle.
-     *
-     * @param executionCount
-     *        the new execution count to assign the JobLocation.
-     */
-    public void setRescheduledCount(int rescheduledCount) {
-        this.rescheduledCount = rescheduledCount;
-    }
-
     @Override
     public String toString() {
-        return "Job [id=" + jobId + ", startTime=" + new Date(startTime) + ", delay=" + delay + ", period=" + period + ", repeat=" + repeat + ", nextTime="
-            + new Date(nextTime) + ", executionCount = " + (rescheduledCount + 1) + "]";
+        return "Job [id=" + jobId + ", startTime=" + new Date(startTime)
+                + ", delay=" + delay + ", period=" + period + ", repeat="
+                + repeat + ", nextTime=" + new Date(nextTime) + "]";
     }
 
     static class JobLocationMarshaller extends VariableMarshaller<List<JobLocation>> {
         static final JobLocationMarshaller INSTANCE = new JobLocationMarshaller();
-
         @Override
         public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
             List<JobLocation> result = new ArrayList<JobLocation>();
@@ -283,7 +228,6 @@ class JobLocation {
         result = prime * result + (int) (period ^ (period >>> 32));
         result = prime * result + repeat;
         result = prime * result + (int) (startTime ^ (startTime >>> 32));
-        result = prime * result + (rescheduledCount ^ (rescheduledCount >>> 32));
         return result;
     }
 
@@ -342,9 +286,6 @@ class JobLocation {
         if (startTime != other.startTime) {
             return false;
         }
-        if (rescheduledCount != other.rescheduledCount) {
-            return false;
-        }
 
         return true;
     }


Mime
View raw message