activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject [2/5] activemq git commit: Revert "https://issues.apache.org/jira/browse/AMQ-3758"
Date Thu, 25 Dec 2014 04:02:24 GMT
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
index 1a08931..5934914 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
@@ -19,10 +19,8 @@ package org.apache.activemq.store.kahadb.scheduler;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.File;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -30,917 +28,363 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.broker.LockableServiceSupport;
+import org.apache.activemq.broker.Locker;
 import org.apache.activemq.broker.scheduler.JobScheduler;
 import org.apache.activemq.broker.scheduler.JobSchedulerStore;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.kahadb.AbstractKahaDBStore;
-import org.apache.activemq.store.kahadb.JournalCommand;
-import org.apache.activemq.store.kahadb.KahaDBMetaData;
-import org.apache.activemq.store.kahadb.Visitor;
-import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand;
-import org.apache.activemq.store.kahadb.data.KahaDestroySchedulerCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand;
-import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand;
-import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
-import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
-import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.store.SharedFileLocker;
+import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Page;
 import org.apache.activemq.store.kahadb.disk.page.PageFile;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
 import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-import org.apache.activemq.store.kahadb.scheduler.legacy.LegacyStoreReplayer;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.ServiceStopper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSchedulerStore {
-
-    private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
-
-    private JobSchedulerKahaDBMetaData metaData = new JobSchedulerKahaDBMetaData(this);
-    private final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
-    private final Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
-    private File legacyStoreArchiveDirectory;
-
-    /**
-     * The Scheduler Token is used to identify base revisions of the Scheduler store.  A store
-     * based on the initial scheduler design will not have this tag in it's meta-data and will
-     * indicate an update is needed.  Later versions of the scheduler can also change this value
-     * to indicate incompatible store bases which require complete meta-data and journal rewrites
-     * instead of simpler meta-data updates.
-     */
-    static final UUID SCHEDULER_STORE_TOKEN = UUID.fromString("57ed642b-1ee3-47b3-be6d-b7297d500409");
-
-    /**
-     * The default scheduler store version.  All new store instance will be given this version and
-     * earlier versions will be updated to this version.
-     */
-    static final int CURRENT_VERSION = 1;
-
-    @Override
-    public JobScheduler getJobScheduler(final String name) throws Exception {
-        this.indexLock.writeLock().lock();
-        try {
-            JobSchedulerImpl result = this.schedulers.get(name);
-            if (result == null) {
-                final JobSchedulerImpl js = new JobSchedulerImpl(this);
-                js.setName(name);
-                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                    @Override
-                    public void execute(Transaction tx) throws IOException {
-                        js.createIndexes(tx);
-                        js.load(tx);
-                        metaData.getJobSchedulers().put(tx, name, js);
-                    }
-                });
-                result = js;
-                this.schedulers.put(name, js);
-                if (isStarted()) {
-                    result.start();
-                }
-                this.pageFile.flush();
-            }
-            return result;
-        } finally {
-            this.indexLock.writeLock().unlock();
+public class JobSchedulerStoreImpl extends LockableServiceSupport implements JobSchedulerStore {
+    static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
+    private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
+
+    public static final int CLOSED_STATE = 1;
+    public static final int OPEN_STATE = 2;
+
+    private File directory;
+    PageFile pageFile;
+    private Journal journal;
+    protected AtomicLong journalSize = new AtomicLong(0);
+    private boolean failIfDatabaseIsLocked;
+    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
+    private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
+    private boolean enableIndexWriteAsync = false;
+    MetaData metaData = new MetaData(this);
+    final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
+    Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
+
+    protected class MetaData {
+        protected MetaData(JobSchedulerStoreImpl store) {
+            this.store = store;
         }
-    }
 
-    @Override
-    public boolean removeJobScheduler(final String name) throws Exception {
-        boolean result = false;
+        private final JobSchedulerStoreImpl store;
+        Page<MetaData> page;
+        BTreeIndex<Integer, Integer> journalRC;
+        BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
 
-        this.indexLock.writeLock().lock();
-        try {
-            final JobSchedulerImpl js = this.schedulers.remove(name);
-            result = js != null;
-            if (result) {
-                js.stop();
-                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                    @Override
-                    public void execute(Transaction tx) throws IOException {
-                        metaData.getJobSchedulers().remove(tx, name);
-                        js.removeAll(tx);
-                    }
-                });
-            }
-        } finally {
-            this.indexLock.writeLock().unlock();
+        void createIndexes(Transaction tx) throws IOException {
+            this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
+            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
         }
-        return result;
-    }
-
-    /**
-     * Sets the directory where the legacy scheduler store files are archived before an
-     * update attempt is made.  Both the legacy index files and the journal files are moved
-     * to this folder prior to an upgrade attempt.
-     *
-     * @param directory
-     *      The directory to move the legacy Scheduler Store files to.
-     */
-    public void setLegacyStoreArchiveDirectory(File directory) {
-        this.legacyStoreArchiveDirectory = directory;
-    }
 
-    /**
-     * Gets the directory where the legacy Scheduler Store files will be archived if the
-     * broker is started and an existing Job Scheduler Store from an old version is detected.
-     *
-     * @return the directory where scheduler store legacy files are archived on upgrade.
-     */
-    public File getLegacyStoreArchiveDirectory() {
-        if (this.legacyStoreArchiveDirectory == null) {
-            this.legacyStoreArchiveDirectory = new File(getDirectory(), "legacySchedulerStore");
+        void load(Transaction tx) throws IOException {
+            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
+            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
+            this.storedSchedulers.load(tx);
+            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
+            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
+            this.journalRC.load(tx);
         }
 
-        return this.legacyStoreArchiveDirectory.getAbsoluteFile();
-    }
-
-    @Override
-    public void load() throws IOException {
-        if (opened.compareAndSet(false, true)) {
-            getJournal().start();
-            try {
-                loadPageFile();
-            } catch (UnknownStoreVersionException ex) {
-                LOG.info("Can't start until store update is performed.");
-                upgradeFromLegacy();
-                // Restart with the updated store
-                getJournal().start();
-                loadPageFile();
-                LOG.info("Update from legacy Scheduler store completed successfully.");
-            } catch (Throwable t) {
-                LOG.warn("Index corrupted. Recovering the index through journal replay. Cause: {}", t.toString());
-                LOG.debug("Index load failure", t);
-
-                // try to recover index
-                try {
-                    pageFile.unload();
-                } catch (Exception ignore) {
-                }
-                if (isArchiveCorruptedIndex()) {
-                    pageFile.archive();
-                } else {
-                    pageFile.delete();
-                }
-                metaData = new JobSchedulerKahaDBMetaData(this);
-                pageFile = null;
-                loadPageFile();
+        void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
+            for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
+                Entry<String, JobSchedulerImpl> entry = i.next();
+                entry.getValue().load(tx);
+                schedulers.put(entry.getKey(), entry.getValue());
             }
-            startCheckpoint();
-            recover();
         }
-        LOG.info("{} started.", this);
-    }
-
-    @Override
-    public void unload() throws IOException {
-        if (opened.compareAndSet(true, false)) {
-            for (JobSchedulerImpl js : this.schedulers.values()) {
-                try {
-                    js.stop();
-                } catch (Exception e) {
-                    throw new IOException(e);
-                }
-            }
-            this.indexLock.writeLock().lock();
-            try {
-                if (pageFile != null && pageFile.isLoaded()) {
-                    metaData.setState(KahaDBMetaData.CLOSED_STATE);
-
-                    if (metaData.getPage() != null) {
-                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                            @Override
-                            public void execute(Transaction tx) throws IOException {
-                                tx.store(metaData.getPage(), metaDataMarshaller, true);
-                            }
-                        });
-                    }
-                }
-            } finally {
-                this.indexLock.writeLock().unlock();
-            }
 
-            checkpointLock.writeLock().lock();
-            try {
-                if (metaData.getPage() != null) {
-                    checkpointUpdate(true);
-                }
-            } finally {
-                checkpointLock.writeLock().unlock();
-            }
-            synchronized (checkpointThreadLock) {
-                if (checkpointThread != null) {
-                    try {
-                        checkpointThread.join();
-                        checkpointThread = null;
-                    } catch (InterruptedException e) {
-                    }
-                }
-            }
-
-            if (pageFile != null) {
-                pageFile.unload();
-                pageFile = null;
-            }
-            if (this.journal != null) {
-                journal.close();
-                journal = null;
-            }
-
-            metaData = new JobSchedulerKahaDBMetaData(this);
+        public void read(DataInput is) throws IOException {
+            this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
+            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
+            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
+            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
+            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
+            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
         }
-        LOG.info("{} stopped.", this);
-    }
 
-    private void loadPageFile() throws IOException {
-        this.indexLock.writeLock().lock();
-        try {
-            final PageFile pageFile = getPageFile();
-            pageFile.load();
-            pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                @Override
-                public void execute(Transaction tx) throws IOException {
-                    if (pageFile.getPageCount() == 0) {
-                        Page<JobSchedulerKahaDBMetaData> page = tx.allocate();
-                        assert page.getPageId() == 0;
-                        page.set(metaData);
-                        metaData.setPage(page);
-                        metaData.setState(KahaDBMetaData.CLOSED_STATE);
-                        metaData.initialize(tx);
-                        tx.store(metaData.getPage(), metaDataMarshaller, true);
-                    } else {
-                        Page<JobSchedulerKahaDBMetaData> page = null;
-                        page = tx.load(0, metaDataMarshaller);
-                        metaData = page.get();
-                        metaData.setPage(page);
-                    }
-                    metaData.load(tx);
-                    metaData.loadScheduler(tx, schedulers);
-                    for (JobSchedulerImpl js : schedulers.values()) {
-                        try {
-                            js.start();
-                        } catch (Exception e) {
-                            JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e);
-                        }
-                    }
-                }
-            });
-
-            pageFile.flush();
-        } finally {
-            this.indexLock.writeLock().unlock();
+        public void write(DataOutput os) throws IOException {
+            os.writeLong(this.storedSchedulers.getPageId());
+            os.writeLong(this.journalRC.getPageId());
         }
     }
 
-    private void upgradeFromLegacy() throws IOException {
-
-        journal.close();
-        journal = null;
-        try {
-            pageFile.unload();
-            pageFile = null;
-        } catch (Exception ignore) {}
-
-        File storeDir = getDirectory().getAbsoluteFile();
-        File storeArchiveDir = getLegacyStoreArchiveDirectory();
-
-        LOG.info("Attempting to move old store files from {} to {}", storeDir, storeArchiveDir);
-
-        // Move only the known store files, locks and other items left in place.
-        IOHelper.moveFiles(storeDir, storeArchiveDir, new FilenameFilter() {
-
-            @Override
-            public boolean accept(File dir, String name) {
-                if (name.endsWith(".data") || name.endsWith(".redo") || name.endsWith(".log")) {
-                    return true;
-                }
-                return false;
-            }
-        });
-
-        // We reset everything to clean state, then we can read from the old
-        // scheduler store and replay the scheduled jobs into this one as adds.
-        getJournal().start();
-        metaData = new JobSchedulerKahaDBMetaData(this);
-        pageFile = null;
-        loadPageFile();
-
-        LegacyStoreReplayer replayer = new LegacyStoreReplayer(getLegacyStoreArchiveDirectory());
-        replayer.load();
-        replayer.startReplay(this);
-
-        // Cleanup after replay and store what we've done.
-        pageFile.tx().execute(new Transaction.Closure<IOException>() {
-            @Override
-            public void execute(Transaction tx) throws IOException {
-                tx.store(metaData.getPage(), metaDataMarshaller, true);
-            }
-        });
-
-        checkpointUpdate(true);
-        getJournal().close();
-        getPageFile().unload();
-    }
-
-    @Override
-    protected void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
-        LOG.debug("Job Scheduler Store Checkpoint started.");
-
-        // reflect last update exclusive of current checkpoint
-        Location lastUpdate = metaData.getLastUpdateLocation();
-        metaData.setState(KahaDBMetaData.OPEN_STATE);
-        tx.store(metaData.getPage(), metaDataMarshaller, true);
-        pageFile.flush();
-
-        if (cleanup) {
-            final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
-            final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
-
-            LOG.trace("Last update: {}, full gc candidates set: {}", lastUpdate, gcCandidateSet);
-
-            if (lastUpdate != null) {
-                gcCandidateSet.remove(lastUpdate.getDataFileId());
-            }
-
-            this.metaData.getJournalRC().visit(tx, new BTreeVisitor<Integer, Integer>() {
-
-                @Override
-                public void visit(List<Integer> keys, List<Integer> values) {
-                    for (Integer key : keys) {
-                        if (gcCandidateSet.remove(key)) {
-                            LOG.trace("Removed referenced file: {} from GC set", key);
-                        }
-                    }
-                }
-
-                @Override
-                public boolean isInterestedInKeysBetween(Integer first, Integer second) {
-                    return true;
-                }
-            });
-
-            LOG.trace("gc candidates after reference check: {}", gcCandidateSet);
-
-            // If there are GC candidates then check the remove command location to see
-            // if any of them can go or if they must stay in order to ensure proper recover.
-            //
-            // A log containing any remove commands must be kept until all the logs with the
-            // add commands for all the removed jobs have been dropped.
-            if (!gcCandidateSet.isEmpty()) {
-                Iterator<Entry<Integer, List<Integer>>> removals = metaData.getRemoveLocationTracker().iterator(tx);
-                List<Integer> orphans = new ArrayList<Integer>();
-                while (removals.hasNext()) {
-                    boolean orphanedRemve = true;
-                    Entry<Integer, List<Integer>> entry = removals.next();
-
-                    // If this log is not a GC candidate then there's no need to do a check to rule it out
-                    if (gcCandidateSet.contains(entry.getKey())) {
-                        for (Integer addLocation : entry.getValue()) {
-                            if (completeFileSet.contains(addLocation)) {
-                                orphanedRemve = false;
-                                break;
-                            }
-                        }
-
-                        // If it's not orphaned than we can't remove it, otherwise we
-                        // stop tracking it it's log will get deleted on the next check.
-                        if (!orphanedRemve) {
-                            LOG.trace("A remove in log {} has an add still in existance.", entry.getKey());
-                            gcCandidateSet.remove(entry.getKey());
-                        } else {
-                            LOG.trace("All removes in log {} are orphaned, file can be GC'd", entry.getKey());
-                            orphans.add(entry.getKey());
-                        }
-                    }
-                }
-
-                // Drop all orphaned removes from the tracker.
-                for (Integer orphan : orphans) {
-                    metaData.getRemoveLocationTracker().remove(tx, orphan);
-                }
-            }
+    class MetaDataMarshaller extends VariableMarshaller<MetaData> {
+        private final JobSchedulerStoreImpl store;
 
-            LOG.trace("gc candidates after removals check: {}", gcCandidateSet);
-            if (!gcCandidateSet.isEmpty()) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
-                }
-                journal.removeDataFiles(gcCandidateSet);
-            }
+        MetaDataMarshaller(JobSchedulerStoreImpl store) {
+            this.store = store;
         }
 
-        LOG.debug("Job Scheduler Store Checkpoint complete.");
-    }
+        @Override
+        public MetaData readPayload(DataInput dataIn) throws IOException {
+            MetaData rc = new MetaData(this.store);
+            rc.read(dataIn);
+            return rc;
+        }
 
-    /**
-     * Adds a reference for the journal log file pointed to by the given Location value.
-     *
-     * To prevent log files in the journal that still contain valid data that needs to be
-     * kept in order to allow for recovery the logs must have active references.  Each Job
-     * scheduler should ensure that the logs are accurately referenced.
-     *
-     * @param tx
-     *      The TX under which the update is to be performed.
-     * @param location
-     *      The location value to update the reference count of.
-     *
-     * @throws IOException if an error occurs while updating the journal references table.
-     */
-    protected void incrementJournalCount(Transaction tx, Location location) throws IOException {
-        int logId = location.getDataFileId();
-        Integer val = metaData.getJournalRC().get(tx, logId);
-        int refCount = val != null ? val.intValue() + 1 : 1;
-        metaData.getJournalRC().put(tx, logId, refCount);
+        @Override
+        public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
+            object.write(dataOut);
+        }
     }
 
-    /**
-     * Removes one reference for the Journal log file indicated in the given Location value.
-     *
-     * The references are used to track which log files cannot be GC'd.  When the reference count
-     * on a log file reaches zero the file id is removed from the tracker and the log will be
-     * removed on the next check point update.
-     *
-     * @param tx
-     *      The TX under which the update is to be performed.
-     * @param location
-     *      The location value to update the reference count of.
-     *
-     * @throws IOException if an error occurs while updating the journal references table.
-     */
-    protected void decrementJournalCount(Transaction tx, Location location) throws IOException {
-        int logId = location.getDataFileId();
-        Integer refCount = metaData.getJournalRC().get(tx, logId);
-        if (refCount != null) {
-            int refCountValue = refCount;
-            refCountValue--;
-            if (refCountValue <= 0) {
-                metaData.getJournalRC().remove(tx, logId);
-            } else {
-                metaData.getJournalRC().put(tx, logId, refCountValue);
+    class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
+        @Override
+        public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
+            List<JobLocation> result = new ArrayList<JobLocation>();
+            int size = dataIn.readInt();
+            for (int i = 0; i < size; i++) {
+                JobLocation jobLocation = new JobLocation();
+                jobLocation.readExternal(dataIn);
+                result.add(jobLocation);
             }
+            return result;
         }
-    }
 
-    /**
-     * Updates the Job removal tracking index with the location of a remove command and the
-     * original JobLocation entry.
-     *
-     * The JobLocation holds the locations in the logs where the add and update commands for
-     * a job stored.  The log file containing the remove command can only be discarded after
-     * both the add and latest update log files have also been discarded.
-     *
-     * @param tx
-     *      The TX under which the update is to be performed.
-     * @param location
-     *      The location value to reference a remove command.
-     * @param removedJob
-     *      The original JobLocation instance that holds the add and update locations
-     *
-     * @throws IOException if an error occurs while updating the remove location tracker.
-     */
-    protected void referenceRemovedLocation(Transaction tx, Location location, JobLocation removedJob) throws IOException {
-        int logId = location.getDataFileId();
-        List<Integer> removed = this.metaData.getRemoveLocationTracker().get(tx, logId);
-        if (removed == null) {
-            removed = new ArrayList<Integer>();
+        @Override
+        public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
+            dataOut.writeInt(value.size());
+            for (JobLocation jobLocation : value) {
+                jobLocation.writeExternal(dataOut);
+            }
         }
-        removed.add(removedJob.getLocation().getDataFileId());
-        this.metaData.getRemoveLocationTracker().put(tx, logId, removed);
-    }
-
-    /**
-     * Retrieve the scheduled Job's byte blob from the journal.
-     *
-     * @param location
-     *      The location of the KahaAddScheduledJobCommand that originated the Job.
-     *
-     * @return a ByteSequence containing the payload of the scheduled Job.
-     *
-     * @throws IOException if an error occurs while reading the payload value.
-     */
-    protected ByteSequence getPayload(Location location) throws IOException {
-        KahaAddScheduledJobCommand job = (KahaAddScheduledJobCommand) this.load(location);
-        Buffer payload = job.getPayload();
-        return new ByteSequence(payload.getData(), payload.getOffset(), payload.getLength());
-    }
-
-    public void readLockIndex() {
-        this.indexLock.readLock().lock();
-    }
-
-    public void readUnlockIndex() {
-        this.indexLock.readLock().unlock();
-    }
-
-    public void writeLockIndex() {
-        this.indexLock.writeLock().lock();
-    }
-
-    public void writeUnlockIndex() {
-        this.indexLock.writeLock().unlock();
-    }
-
-    @Override
-    public String toString() {
-        return "JobSchedulerStore: " + getDirectory();
-    }
-
-    @Override
-    protected String getPageFileName() {
-        return "scheduleDB";
-    }
-
-    @Override
-    protected File getDefaultDataDirectory() {
-        return new File(IOHelper.getDefaultDataDirectory(), "delayedDB");
     }
 
-    private class MetaDataMarshaller extends VariableMarshaller<JobSchedulerKahaDBMetaData> {
-
+    class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
         private final JobSchedulerStoreImpl store;
 
-        MetaDataMarshaller(JobSchedulerStoreImpl store) {
+        JobSchedulerMarshaller(JobSchedulerStoreImpl store) {
             this.store = store;
         }
 
         @Override
-        public JobSchedulerKahaDBMetaData readPayload(DataInput dataIn) throws IOException {
-            JobSchedulerKahaDBMetaData rc = new JobSchedulerKahaDBMetaData(store);
-            rc.read(dataIn);
-            return rc;
+        public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
+            JobSchedulerImpl result = new JobSchedulerImpl(this.store);
+            result.read(dataIn);
+            return result;
         }
 
         @Override
-        public void writePayload(JobSchedulerKahaDBMetaData object, DataOutput dataOut) throws IOException {
-            object.write(dataOut);
+        public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
+            js.write(dataOut);
         }
     }
 
-    /**
-     * Called during index recovery to rebuild the index from the last known good location.  For
-     * entries that occur before the last known good position we just ignore then and move on.
-     *
-     * @param command
-     *        the command read from the Journal which should be used to update the index.
-     * @param location
-     *        the location in the index where the command was read.
-     * @param inDoubtlocation
-     *        the location in the index known to be the last time the index was valid.
-     *
-     * @throws IOException if an error occurs while recovering the index.
-     */
-    protected void doRecover(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
-        if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
-            process(data, location);
-        }
+    @Override
+    public File getDirectory() {
+        return directory;
     }
 
-    /**
-     * Called during recovery to allow the store to rebuild from scratch.
-     *
-     * @param data
-     *      The command to process, which was read from the Journal.
-     * @param location
-     *      The location of the command in the Journal.
-     *
-     * @throws IOException if an error occurs during command processing.
-     */
     @Override
-    protected void process(JournalCommand<?> data, final Location location) throws IOException {
-        data.visit(new Visitor() {
-            @Override
-            public void visit(final KahaAddScheduledJobCommand command) throws IOException {
-                final JobSchedulerImpl scheduler;
-
-                indexLock.writeLock().lock();
-                try {
-                    try {
-                        scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler());
-                    } catch (Exception e) {
-                        throw new IOException(e);
-                    }
-                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                        @Override
-                        public void execute(Transaction tx) throws IOException {
-                            scheduler.process(tx, command, location);
-                        }
-                    });
-
-                    processLocation(location);
-                } finally {
-                    indexLock.writeLock().unlock();
-                }
-            }
-
-            @Override
-            public void visit(final KahaRemoveScheduledJobCommand command) throws IOException {
-                final JobSchedulerImpl scheduler;
-
-                indexLock.writeLock().lock();
-                try {
-                    try {
-                        scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler());
-                    } catch (Exception e) {
-                        throw new IOException(e);
-                    }
-                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                        @Override
-                        public void execute(Transaction tx) throws IOException {
-                            scheduler.process(tx, command, location);
-                        }
-                    });
-
-                    processLocation(location);
-                } finally {
-                    indexLock.writeLock().unlock();
-                }
-            }
-
-            @Override
-            public void visit(final KahaRemoveScheduledJobsCommand command) throws IOException {
-                final JobSchedulerImpl scheduler;
-
-                indexLock.writeLock().lock();
-                try {
-                    try {
-                        scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler());
-                    } catch (Exception e) {
-                        throw new IOException(e);
-                    }
-                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                        @Override
-                        public void execute(Transaction tx) throws IOException {
-                            scheduler.process(tx, command, location);
-                        }
-                    });
-
-                    processLocation(location);
-                } finally {
-                    indexLock.writeLock().unlock();
-                }
-            }
-
-            @Override
-            public void visit(final KahaRescheduleJobCommand command) throws IOException {
-                final JobSchedulerImpl scheduler;
-
-                indexLock.writeLock().lock();
-                try {
-                    try {
-                        scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler());
-                    } catch (Exception e) {
-                        throw new IOException(e);
-                    }
-                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                        @Override
-                        public void execute(Transaction tx) throws IOException {
-                            scheduler.process(tx, command, location);
-                        }
-                    });
-
-                    processLocation(location);
-                } finally {
-                    indexLock.writeLock().unlock();
-                }
-            }
-
-            @Override
-            public void visit(final KahaDestroySchedulerCommand command) {
-                try {
-                    removeJobScheduler(command.getScheduler());
-                } catch (Exception e) {
-                    LOG.warn("Failed to remove scheduler: {}", command.getScheduler());
-                }
-
-                processLocation(location);
-            }
-
-            @Override
-            public void visit(KahaTraceCommand command) {
-                processLocation(location);
-            }
-        });
+    public void setDirectory(File directory) {
+        this.directory = directory;
     }
 
-    protected void processLocation(final Location location) {
-        indexLock.writeLock().lock();
+    @Override
+    public long size() {
+        if (!isStarted()) {
+            return 0;
+        }
         try {
-            this.metaData.setLastUpdateLocation(location);
-        } finally {
-            indexLock.writeLock().unlock();
+            return journalSize.get() + pageFile.getDiskSize();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
     }
 
-    /**
-     * We recover from the Journal logs as needed to restore the index.
-     *
-     * @throws IllegalStateException
-     * @throws IOException
-     */
-    private void recover() throws IllegalStateException, IOException {
-        this.indexLock.writeLock().lock();
-        try {
-            long start = System.currentTimeMillis();
-            Location lastIndoubtPosition = getRecoveryPosition();
-            Location recoveryPosition = lastIndoubtPosition;
-
-            if (recoveryPosition != null) {
-                int redoCounter = 0;
-                LOG.info("Recovering from the journal ...");
-                while (recoveryPosition != null) {
-                    JournalCommand<?> message = load(recoveryPosition);
-                    metaData.setLastUpdateLocation(recoveryPosition);
-                    doRecover(message, recoveryPosition, lastIndoubtPosition);
-                    redoCounter++;
-                    recoveryPosition = journal.getNextLocation(recoveryPosition);
-                     if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
-                         LOG.info("@ {}, {} entries recovered ..", recoveryPosition, redoCounter);
-                     }
+    @Override
+    public JobScheduler getJobScheduler(final String name) throws Exception {
+        JobSchedulerImpl result = this.schedulers.get(name);
+        if (result == null) {
+            final JobSchedulerImpl js = new JobSchedulerImpl(this);
+            js.setName(name);
+            getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
+                public void execute(Transaction tx) throws IOException {
+                    js.createIndexes(tx);
+                    js.load(tx);
+                    metaData.storedSchedulers.put(tx, name, js);
                 }
-                long end = System.currentTimeMillis();
-                LOG.info("Recovery replayed {} operations from the journal in {} seconds.",
-                         redoCounter, ((end - start) / 1000.0f));
+            });
+            result = js;
+            this.schedulers.put(name, js);
+            if (isStarted()) {
+                result.start();
             }
+            this.pageFile.flush();
+        }
+        return result;
+    }
 
-            // We may have to undo some index updates.
-            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+    @Override
+    synchronized public boolean removeJobScheduler(final String name) throws Exception {
+        boolean result = false;
+        final JobSchedulerImpl js = this.schedulers.remove(name);
+        result = js != null;
+        if (result) {
+            js.stop();
+            getPageFile().tx().execute(new Transaction.Closure<IOException>() {
                 @Override
                 public void execute(Transaction tx) throws IOException {
-                    recoverIndex(tx);
+                    metaData.storedSchedulers.remove(tx, name);
+                    js.destroy(tx);
                 }
             });
-
-        } finally {
-            this.indexLock.writeLock().unlock();
         }
+        return result;
     }
 
-    private Location getRecoveryPosition() throws IOException {
-        // This loads the first position and we completely rebuild the index if we
-        // do not override it with some known recovery start location.
-        Location result = null;
-
-        if (!isForceRecoverIndex()) {
-            if (metaData.getLastUpdateLocation() != null) {
-                result = metaData.getLastUpdateLocation();
-            }
+    @Override
+    protected synchronized void doStart() throws Exception {
+        if (this.directory == null) {
+            this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
         }
+        IOHelper.mkdirs(this.directory);
+        this.journal = new Journal();
+        this.journal.setDirectory(directory);
+        this.journal.setMaxFileLength(getJournalMaxFileLength());
+        this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
+        this.journal.setSizeAccumulator(this.journalSize);
+        this.journal.start();
+        this.pageFile = new PageFile(directory, "scheduleDB");
+        this.pageFile.setWriteBatchSize(1);
+        this.pageFile.load();
+
+        this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
+            @Override
+            public void execute(Transaction tx) throws IOException {
+                if (pageFile.getPageCount() == 0) {
+                    Page<MetaData> page = tx.allocate();
+                    assert page.getPageId() == 0;
+                    page.set(metaData);
+                    metaData.page = page;
+                    metaData.createIndexes(tx);
+                    tx.store(metaData.page, metaDataMarshaller, true);
 
-        return journal.getNextLocation(result);
-    }
-
-    private void recoverIndex(Transaction tx) throws IOException {
-        long start = System.currentTimeMillis();
-
-        // It is possible index updates got applied before the journal updates..
-        // in that case we need to removed references to Jobs that are not in the journal
-        final Location lastAppendLocation = journal.getLastAppendLocation();
-        long undoCounter = 0;
-
-        // Go through all the jobs in each scheduler and check if any are added after
-        // the last appended location and remove those.  For now we ignore the update
-        // location since the scheduled job will update itself after the next fire and
-        // a new update will replace any existing update.
-        for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) {
-            Map.Entry<String, JobSchedulerImpl> entry = i.next();
-            JobSchedulerImpl scheduler = entry.getValue();
-
-            List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
-            for (JobLocation job : jobs) {
-                if (job.getLocation().compareTo(lastAppendLocation) >= 0) {
-                    if (scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime())) {
-                        LOG.trace("Removed Job past last appened in the journal: {}", job.getJobId());
-                        undoCounter++;
+                } else {
+                    Page<MetaData> page = tx.load(0, metaDataMarshaller);
+                    metaData = page.get();
+                    metaData.page = page;
+                }
+                metaData.load(tx);
+                metaData.loadScheduler(tx, schedulers);
+                for (JobSchedulerImpl js : schedulers.values()) {
+                    try {
+                        js.start();
+                    } catch (Exception e) {
+                        JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e);
                     }
                 }
             }
-        }
-
-        if (undoCounter > 0) {
-            // The rolled back operations are basically in flight journal writes.  To avoid getting
-            // these the end user should do sync writes to the journal.
-            long end = System.currentTimeMillis();
-            LOG.info("Rolled back {} messages from the index in {} seconds.", undoCounter, ((end - start) / 1000.0f));
-            undoCounter = 0;
-        }
-
-        // Now we check for missing and corrupt journal files.
+        });
 
-        // 1. Collect the set of all referenced journal files based on the Location of the
-        //    the scheduled jobs and the marked last update field.
-        HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
-        for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) {
-            Map.Entry<String, JobSchedulerImpl> entry = i.next();
-            JobSchedulerImpl scheduler = entry.getValue();
+        this.pageFile.flush();
+        LOG.info(this + " started");
+    }
 
-            List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
-            for (JobLocation job : jobs) {
-                missingJournalFiles.add(job.getLocation().getDataFileId());
-                if (job.getLastUpdate() != null) {
-                    missingJournalFiles.add(job.getLastUpdate().getDataFileId());
-                }
-            }
+    @Override
+    protected synchronized void doStop(ServiceStopper stopper) throws Exception {
+        for (JobSchedulerImpl js : this.schedulers.values()) {
+            js.stop();
         }
-
-        // 2. Remove from that set all known data file Id's in the journal and what's left
-        //    is the missing set which will soon also contain the corrupted set.
-        missingJournalFiles.removeAll(journal.getFileMap().keySet());
-        if (!missingJournalFiles.isEmpty()) {
-            LOG.info("Some journal files are missing: {}", missingJournalFiles);
+        if (this.pageFile != null) {
+            this.pageFile.unload();
         }
+        if (this.journal != null) {
+            journal.close();
+        }
+        LOG.info(this + " stopped");
+    }
 
-        // 3. Now check all references in the journal logs for corruption and add any
-        //    corrupt journal files to the missing set.
-        HashSet<Location> corruptedLocations = new HashSet<Location>();
-
-        if (isCheckForCorruptJournalFiles()) {
-            Collection<DataFile> dataFiles = journal.getFileMap().values();
-            for (DataFile dataFile : dataFiles) {
-                int id = dataFile.getDataFileId();
-                for (long offset : dataFile.getCorruptedBlocks()) {
-                    corruptedLocations.add(new Location(id, (int) offset));
-                }
-            }
+    synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
+        int logId = location.getDataFileId();
+        Integer val = this.metaData.journalRC.get(tx, logId);
+        int refCount = val != null ? val.intValue() + 1 : 1;
+        this.metaData.journalRC.put(tx, logId, refCount);
+    }
 
-            if (!corruptedLocations.isEmpty()) {
-                LOG.debug("Found some corrupted data blocks in the journal: {}", corruptedLocations.size());
-            }
+    synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
+        int logId = location.getDataFileId();
+        int refCount = this.metaData.journalRC.get(tx, logId);
+        refCount--;
+        if (refCount <= 0) {
+            this.metaData.journalRC.remove(tx, logId);
+            Set<Integer> set = new HashSet<Integer>();
+            set.add(logId);
+            this.journal.removeDataFiles(set);
+        } else {
+            this.metaData.journalRC.put(tx, logId, refCount);
         }
+    }
 
-        // 4. Now we either fail or we remove all references to missing or corrupt journal
-        //    files from the various JobSchedulerImpl instances.  We only remove the Job if
-        //    the initial Add operation is missing when the ignore option is set, the updates
-        //    could be lost but that's price you pay when ignoring the missing logs.
-        if (!missingJournalFiles.isEmpty() || !corruptedLocations.isEmpty()) {
-            if (!isIgnoreMissingJournalfiles()) {
-                throw new IOException("Detected missing/corrupt journal files.");
-            }
+    synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
+        ByteSequence result = null;
+        result = this.journal.read(location);
+        return result;
+    }
 
-            // Remove all Jobs that reference an Location that is either missing or corrupt.
-            undoCounter = removeJobsInMissingOrCorruptJounralFiles(tx, missingJournalFiles, corruptedLocations);
+    synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
+        return this.journal.write(payload, sync);
+    }
 
-            // Clean up the Journal Reference count Map.
-            removeJournalRCForMissingFiles(tx, missingJournalFiles);
-        }
+    PageFile getPageFile() {
+        this.pageFile.isLoaded();
+        return this.pageFile;
+    }
 
-        if (undoCounter > 0) {
-            long end = System.currentTimeMillis();
-            LOG.info("Detected missing/corrupt journal files.  Dropped {} jobs from the " +
-                     "index in {} seconds.", undoCounter, ((end - start) / 1000.0f));
-        }
+    public boolean isFailIfDatabaseIsLocked() {
+        return failIfDatabaseIsLocked;
     }
 
-    private void removeJournalRCForMissingFiles(Transaction tx, Set<Integer> missing) throws IOException {
-        List<Integer> matches = new ArrayList<Integer>();
+    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
+        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
+    }
 
-        Iterator<Entry<Integer, Integer>> references = metaData.getJournalRC().iterator(tx);
-        while (references.hasNext()) {
-            int dataFileId = references.next().getKey();
-            if (missing.contains(dataFileId)) {
-                matches.add(dataFileId);
-            }
-        }
+    public int getJournalMaxFileLength() {
+        return journalMaxFileLength;
+    }
 
-        for (Integer match : matches) {
-            metaData.getJournalRC().remove(tx, match);
-        }
+    public void setJournalMaxFileLength(int journalMaxFileLength) {
+        this.journalMaxFileLength = journalMaxFileLength;
     }
 
-    private int removeJobsInMissingOrCorruptJounralFiles(Transaction tx, Set<Integer> missing, Set<Location> corrupted) throws IOException {
-        int removed = 0;
+    public int getJournalMaxWriteBatchSize() {
+        return journalMaxWriteBatchSize;
+    }
 
-        // Remove Jobs that reference missing or corrupt files.
-        // Remove Reference counts to missing or corrupt files.
-        // Remove and remove command markers to missing or corrupt files.
-        for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) {
-            Map.Entry<String, JobSchedulerImpl> entry = i.next();
-            JobSchedulerImpl scheduler = entry.getValue();
+    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
+        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
+    }
 
-            List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
-            for (JobLocation job : jobs) {
+    public boolean isEnableIndexWriteAsync() {
+        return enableIndexWriteAsync;
+    }
 
-                // Remove all jobs in missing log files.
-                if (missing.contains(job.getLocation().getDataFileId())) {
-                    scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime());
-                    removed++;
-                    continue;
-                }
+    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
+        this.enableIndexWriteAsync = enableIndexWriteAsync;
+    }
 
-                // Remove all jobs in corrupted parts of log files.
-                if (corrupted.contains(job.getLocation())) {
-                    scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime());
-                    removed++;
-                }
-            }
-        }
+    @Override
+    public String toString() {
+        return "JobSchedulerStore:" + this.directory;
+    }
 
-        return removed;
+    @Override
+    public Locker createDefaultLocker() throws IOException {
+        SharedFileLocker locker = new SharedFileLocker();
+        locker.setDirectory(this.getDirectory());
+        return locker;
+    }
+
+    @Override
+    public void init() throws Exception {
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/UnknownStoreVersionException.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/UnknownStoreVersionException.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/UnknownStoreVersionException.java
deleted file mode 100644
index 5146d84..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/UnknownStoreVersionException.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.activemq.store.kahadb.scheduler;
-
-import java.io.IOException;
-
-public class UnknownStoreVersionException extends IOException {
-
-    private static final long serialVersionUID = -8544753506151157145L;
-
-    private final String token;
-
-    public UnknownStoreVersionException(Throwable cause) {
-        super(cause);
-        this.token = "";
-    }
-
-    public UnknownStoreVersionException(String token) {
-        super("Failed to load Store, found unknown store token: " + token);
-        this.token = token;
-    }
-
-    public String getToken() {
-        return this.token;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobImpl.java
deleted file mode 100644
index 2562f50..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.scheduler.legacy;
-
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.util.ByteSequence;
-
-/**
- * Legacy version Job and Job payload wrapper.  Allows for easy replay of stored
- * legacy jobs into a new JobSchedulerStoreImpl intsance.
- */
-final class LegacyJobImpl {
-
-    private final LegacyJobLocation jobLocation;
-    private final Buffer payload;
-
-    protected LegacyJobImpl(LegacyJobLocation location, ByteSequence payload) {
-        this.jobLocation = location;
-        this.payload = new Buffer(payload.data, payload.offset, payload.length);
-    }
-
-    public String getJobId() {
-        return this.jobLocation.getJobId();
-    }
-
-    public Buffer getPayload() {
-       return this.payload;
-    }
-
-    public long getPeriod() {
-       return this.jobLocation.getPeriod();
-    }
-
-    public int getRepeat() {
-       return this.jobLocation.getRepeat();
-    }
-
-    public long getDelay() {
-        return this.jobLocation.getDelay();
-    }
-
-    public String getCronEntry() {
-        return this.jobLocation.getCronEntry();
-    }
-
-    public long getNextExecutionTime() {
-        return this.jobLocation.getNextTime();
-    }
-
-    public long getStartTime() {
-        return this.jobLocation.getStartTime();
-    }
-
-    @Override
-    public String toString() {
-        return this.jobLocation.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobLocation.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobLocation.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobLocation.java
deleted file mode 100644
index 8437064..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobLocation.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.scheduler.legacy;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.activemq.store.kahadb.disk.journal.Location;
-import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-
-final class LegacyJobLocation {
-
-    private String jobId;
-    private int repeat;
-    private long startTime;
-    private long delay;
-    private long nextTime;
-    private long period;
-    private String cronEntry;
-    private final Location location;
-
-    public LegacyJobLocation(Location location) {
-        this.location = location;
-    }
-
-    public LegacyJobLocation() {
-        this(new Location());
-    }
-
-    public void readExternal(DataInput in) throws IOException {
-        this.jobId = in.readUTF();
-        this.repeat = in.readInt();
-        this.startTime = in.readLong();
-        this.delay = in.readLong();
-        this.nextTime = in.readLong();
-        this.period = in.readLong();
-        this.cronEntry = in.readUTF();
-        this.location.readExternal(in);
-    }
-
-    public void writeExternal(DataOutput out) throws IOException {
-        out.writeUTF(this.jobId);
-        out.writeInt(this.repeat);
-        out.writeLong(this.startTime);
-        out.writeLong(this.delay);
-        out.writeLong(this.nextTime);
-        out.writeLong(this.period);
-        if (this.cronEntry == null) {
-            this.cronEntry = "";
-        }
-        out.writeUTF(this.cronEntry);
-        this.location.writeExternal(out);
-    }
-
-    /**
-     * @return the jobId
-     */
-    public String getJobId() {
-        return this.jobId;
-    }
-
-    /**
-     * @param jobId
-     *            the jobId to set
-     */
-    public void setJobId(String jobId) {
-        this.jobId = jobId;
-    }
-
-    /**
-     * @return the repeat
-     */
-    public int getRepeat() {
-        return this.repeat;
-    }
-
-    /**
-     * @param repeat
-     *            the repeat to set
-     */
-    public void setRepeat(int repeat) {
-        this.repeat = repeat;
-    }
-
-    /**
-     * @return the start
-     */
-    public long getStartTime() {
-        return this.startTime;
-    }
-
-    /**
-     * @param start
-     *            the start to set
-     */
-    public void setStartTime(long start) {
-        this.startTime = start;
-    }
-
-    /**
-     * @return the nextTime
-     */
-    public synchronized long getNextTime() {
-        return this.nextTime;
-    }
-
-    /**
-     * @param nextTime
-     *            the nextTime to set
-     */
-    public synchronized void setNextTime(long nextTime) {
-        this.nextTime = nextTime;
-    }
-
-    /**
-     * @return the period
-     */
-    public long getPeriod() {
-        return this.period;
-    }
-
-    /**
-     * @param period
-     *            the period to set
-     */
-    public void setPeriod(long period) {
-        this.period = period;
-    }
-
-    /**
-     * @return the cronEntry
-     */
-    public synchronized String getCronEntry() {
-        return this.cronEntry;
-    }
-
-    /**
-     * @param cronEntry
-     *            the cronEntry to set
-     */
-    public synchronized void setCronEntry(String cronEntry) {
-        this.cronEntry = cronEntry;
-    }
-
-    /**
-     * @return if this JobLocation represents a cron entry.
-     */
-    public boolean isCron() {
-        return getCronEntry() != null && getCronEntry().length() > 0;
-    }
-
-    /**
-     * @return the delay
-     */
-    public long getDelay() {
-        return this.delay;
-    }
-
-    /**
-     * @param delay
-     *            the delay to set
-     */
-    public void setDelay(long delay) {
-        this.delay = delay;
-    }
-
-    /**
-     * @return the location
-     */
-    public Location getLocation() {
-        return this.location;
-    }
-
-    @Override
-    public String toString() {
-        return "Job [id=" + jobId + ", startTime=" + new Date(startTime) +
-               ", delay=" + delay + ", period=" + period +
-               ", repeat=" + repeat + ", nextTime=" + new Date(nextTime) + "]";
-    }
-
-    static class JobLocationMarshaller extends VariableMarshaller<List<LegacyJobLocation>> {
-        static final JobLocationMarshaller INSTANCE = new JobLocationMarshaller();
-
-        @Override
-        public List<LegacyJobLocation> readPayload(DataInput dataIn) throws IOException {
-            List<LegacyJobLocation> result = new ArrayList<LegacyJobLocation>();
-            int size = dataIn.readInt();
-            for (int i = 0; i < size; i++) {
-                LegacyJobLocation jobLocation = new LegacyJobLocation();
-                jobLocation.readExternal(dataIn);
-                result.add(jobLocation);
-            }
-            return result;
-        }
-
-        @Override
-        public void writePayload(List<LegacyJobLocation> value, DataOutput dataOut) throws IOException {
-            dataOut.writeInt(value.size());
-            for (LegacyJobLocation jobLocation : value) {
-                jobLocation.writeExternal(dataOut);
-            }
-        }
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((cronEntry == null) ? 0 : cronEntry.hashCode());
-        result = prime * result + (int) (delay ^ (delay >>> 32));
-        result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
-        result = prime * result + ((location == null) ? 0 : location.hashCode());
-        result = prime * result + (int) (nextTime ^ (nextTime >>> 32));
-        result = prime * result + (int) (period ^ (period >>> 32));
-        result = prime * result + repeat;
-        result = prime * result + (int) (startTime ^ (startTime >>> 32));
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-
-        if (obj == null) {
-            return false;
-        }
-
-        if (getClass() != obj.getClass()) {
-            return false;
-        }
-
-        LegacyJobLocation other = (LegacyJobLocation) obj;
-
-        if (cronEntry == null) {
-            if (other.cronEntry != null) {
-                return false;
-            }
-        } else if (!cronEntry.equals(other.cronEntry)) {
-            return false;
-        }
-
-        if (delay != other.delay) {
-            return false;
-        }
-
-        if (jobId == null) {
-            if (other.jobId != null)
-                return false;
-        } else if (!jobId.equals(other.jobId)) {
-            return false;
-        }
-
-        if (location == null) {
-            if (other.location != null) {
-                return false;
-            }
-        } else if (!location.equals(other.location)) {
-            return false;
-        }
-
-        if (nextTime != other.nextTime) {
-            return false;
-        }
-        if (period != other.period) {
-            return false;
-        }
-        if (repeat != other.repeat) {
-            return false;
-        }
-        if (startTime != other.startTime) {
-            return false;
-        }
-
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerImpl.java
deleted file mode 100644
index 687ffd7..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerImpl.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.scheduler.legacy;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
-import org.apache.activemq.store.kahadb.disk.journal.Location;
-import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
-import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ServiceSupport;
-
-/**
- * Read-only view of a stored legacy JobScheduler instance.
- */
-final class LegacyJobSchedulerImpl extends ServiceSupport {
-
-    private final LegacyJobSchedulerStoreImpl store;
-    private String name;
-    private BTreeIndex<Long, List<LegacyJobLocation>> index;
-
-    LegacyJobSchedulerImpl(LegacyJobSchedulerStoreImpl store) {
-        this.store = store;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getName() {
-        return this.name;
-    }
-
-    /**
-     * Returns the next time that a job would be scheduled to run.
-     *
-     * @return time of next scheduled job to run.
-     *
-     * @throws IOException if an error occurs while fetching the time.
-     */
-    public long getNextScheduleTime() throws IOException {
-        Map.Entry<Long, List<LegacyJobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
-        return first != null ? first.getKey() : -1l;
-    }
-
-    /**
-     * Gets the list of the next batch of scheduled jobs in the store.
-     *
-     * @return a list of the next jobs that will run.
-     *
-     * @throws IOException if an error occurs while fetching the jobs list.
-     */
-    public List<LegacyJobImpl> getNextScheduleJobs() throws IOException {
-        final List<LegacyJobImpl> result = new ArrayList<LegacyJobImpl>();
-
-        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-            @Override
-            public void execute(Transaction tx) throws IOException {
-                Map.Entry<Long, List<LegacyJobLocation>> first = index.getFirst(store.getPageFile().tx());
-                if (first != null) {
-                    for (LegacyJobLocation jl : first.getValue()) {
-                        ByteSequence bs = getPayload(jl.getLocation());
-                        LegacyJobImpl job = new LegacyJobImpl(jl, bs);
-                        result.add(job);
-                    }
-                }
-            }
-        });
-        return result;
-    }
-
-    /**
-     * Gets a list of all scheduled jobs in this store.
-     *
-     * @return a list of all the currently scheduled jobs in this store.
-     *
-     * @throws IOException if an error occurs while fetching the list of jobs.
-     */
-    public List<LegacyJobImpl> getAllJobs() throws IOException {
-        final List<LegacyJobImpl> result = new ArrayList<LegacyJobImpl>();
-        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-            @Override
-            public void execute(Transaction tx) throws IOException {
-                Iterator<Map.Entry<Long, List<LegacyJobLocation>>> iter = index.iterator(store.getPageFile().tx());
-                while (iter.hasNext()) {
-                    Map.Entry<Long, List<LegacyJobLocation>> next = iter.next();
-                    if (next != null) {
-                        for (LegacyJobLocation jl : next.getValue()) {
-                            ByteSequence bs = getPayload(jl.getLocation());
-                            LegacyJobImpl job = new LegacyJobImpl(jl, bs);
-                            result.add(job);
-                        }
-                    } else {
-                        break;
-                    }
-                }
-            }
-        });
-        return result;
-    }
-
-    /**
-     * Gets a list of all scheduled jobs that exist between the given start and end time.
-     *
-     * @param start
-     *      The start time to look for scheduled jobs.
-     * @param finish
-     *      The end time to stop looking for scheduled jobs.
-     *
-     * @return a list of all scheduled jobs that would run between the given start and end time.
-     *
-     * @throws IOException if an error occurs while fetching the list of jobs.
-     */
-    public List<LegacyJobImpl> getAllJobs(final long start, final long finish) throws IOException {
-        final List<LegacyJobImpl> result = new ArrayList<LegacyJobImpl>();
-        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-            @Override
-            public void execute(Transaction tx) throws IOException {
-                Iterator<Map.Entry<Long, List<LegacyJobLocation>>> iter = index.iterator(store.getPageFile().tx(), start);
-                while (iter.hasNext()) {
-                    Map.Entry<Long, List<LegacyJobLocation>> next = iter.next();
-                    if (next != null && next.getKey().longValue() <= finish) {
-                        for (LegacyJobLocation jl : next.getValue()) {
-                            ByteSequence bs = getPayload(jl.getLocation());
-                            LegacyJobImpl job = new LegacyJobImpl(jl, bs);
-                            result.add(job);
-                        }
-                    } else {
-                        break;
-                    }
-                }
-            }
-        });
-        return result;
-    }
-
-    ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
-        return this.store.getPayload(location);
-    }
-
-    @Override
-    public String toString() {
-        return "LegacyJobScheduler: " + this.name;
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-    }
-
-    @Override
-    protected void doStop(ServiceStopper stopper) throws Exception {
-    }
-
-    void createIndexes(Transaction tx) throws IOException {
-        this.index = new BTreeIndex<Long, List<LegacyJobLocation>>(this.store.getPageFile(), tx.allocate().getPageId());
-    }
-
-    void load(Transaction tx) throws IOException {
-        this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
-        this.index.setValueMarshaller(ValueMarshaller.INSTANCE);
-        this.index.load(tx);
-    }
-
-    void read(DataInput in) throws IOException {
-        this.name = in.readUTF();
-        this.index = new BTreeIndex<Long, List<LegacyJobLocation>>(this.store.getPageFile(), in.readLong());
-        this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
-        this.index.setValueMarshaller(ValueMarshaller.INSTANCE);
-    }
-
-    public void write(DataOutput out) throws IOException {
-        out.writeUTF(name);
-        out.writeLong(this.index.getPageId());
-    }
-
-    static class ValueMarshaller extends VariableMarshaller<List<LegacyJobLocation>> {
-        static ValueMarshaller INSTANCE = new ValueMarshaller();
-
-        @Override
-        public List<LegacyJobLocation> readPayload(DataInput dataIn) throws IOException {
-            List<LegacyJobLocation> result = new ArrayList<LegacyJobLocation>();
-            int size = dataIn.readInt();
-            for (int i = 0; i < size; i++) {
-                LegacyJobLocation jobLocation = new LegacyJobLocation();
-                jobLocation.readExternal(dataIn);
-                result.add(jobLocation);
-            }
-            return result;
-        }
-
-        @Override
-        public void writePayload(List<LegacyJobLocation> value, DataOutput dataOut) throws IOException {
-            dataOut.writeInt(value.size());
-            for (LegacyJobLocation jobLocation : value) {
-                jobLocation.writeExternal(dataOut);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerStoreImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerStoreImpl.java
deleted file mode 100644
index acbd4e7..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerStoreImpl.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.scheduler.legacy;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
-import org.apache.activemq.store.kahadb.disk.journal.Journal;
-import org.apache.activemq.store.kahadb.disk.journal.Location;
-import org.apache.activemq.store.kahadb.disk.page.Page;
-import org.apache.activemq.store.kahadb.disk.page.PageFile;
-import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller;
-import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
-import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.LockFile;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ServiceSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Read-only view of a legacy JobSchedulerStore implementation.
- */
-final class LegacyJobSchedulerStoreImpl extends ServiceSupport {
-
-    static final Logger LOG = LoggerFactory.getLogger(LegacyJobSchedulerStoreImpl.class);
-
-    private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
-
-    private File directory;
-    private PageFile pageFile;
-    private Journal journal;
-    private LockFile lockFile;
-    private final AtomicLong journalSize = new AtomicLong(0);
-    private boolean failIfDatabaseIsLocked;
-    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
-    private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
-    private boolean enableIndexWriteAsync = false;
-    private MetaData metaData = new MetaData(this);
-    private final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
-    private final Map<String, LegacyJobSchedulerImpl> schedulers = new HashMap<String, LegacyJobSchedulerImpl>();
-
-    protected class MetaData {
-        protected MetaData(LegacyJobSchedulerStoreImpl store) {
-            this.store = store;
-        }
-
-        private final LegacyJobSchedulerStoreImpl store;
-        Page<MetaData> page;
-        BTreeIndex<Integer, Integer> journalRC;
-        BTreeIndex<String, LegacyJobSchedulerImpl> storedSchedulers;
-
-        void createIndexes(Transaction tx) throws IOException {
-            this.storedSchedulers = new BTreeIndex<String, LegacyJobSchedulerImpl>(pageFile, tx.allocate().getPageId());
-            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
-        }
-
-        void load(Transaction tx) throws IOException {
-            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
-            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
-            this.storedSchedulers.load(tx);
-            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
-            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
-            this.journalRC.load(tx);
-        }
-
-        void loadScheduler(Transaction tx, Map<String, LegacyJobSchedulerImpl> schedulers) throws IOException {
-            for (Iterator<Entry<String, LegacyJobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
-                Entry<String, LegacyJobSchedulerImpl> entry = i.next();
-                entry.getValue().load(tx);
-                schedulers.put(entry.getKey(), entry.getValue());
-            }
-        }
-
-        public void read(DataInput is) throws IOException {
-            this.storedSchedulers = new BTreeIndex<String, LegacyJobSchedulerImpl>(pageFile, is.readLong());
-            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
-            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
-            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
-            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
-            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
-        }
-
-        public void write(DataOutput os) throws IOException {
-            os.writeLong(this.storedSchedulers.getPageId());
-            os.writeLong(this.journalRC.getPageId());
-        }
-    }
-
-    class MetaDataMarshaller extends VariableMarshaller<MetaData> {
-        private final LegacyJobSchedulerStoreImpl store;
-
-        MetaDataMarshaller(LegacyJobSchedulerStoreImpl store) {
-            this.store = store;
-        }
-
-        @Override
-        public MetaData readPayload(DataInput dataIn) throws IOException {
-            MetaData rc = new MetaData(this.store);
-            rc.read(dataIn);
-            return rc;
-        }
-
-        @Override
-        public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
-            object.write(dataOut);
-        }
-    }
-
-    class ValueMarshaller extends VariableMarshaller<List<LegacyJobLocation>> {
-        @Override
-        public List<LegacyJobLocation> readPayload(DataInput dataIn) throws IOException {
-            List<LegacyJobLocation> result = new ArrayList<LegacyJobLocation>();
-            int size = dataIn.readInt();
-            for (int i = 0; i < size; i++) {
-                LegacyJobLocation jobLocation = new LegacyJobLocation();
-                jobLocation.readExternal(dataIn);
-                result.add(jobLocation);
-            }
-            return result;
-        }
-
-        @Override
-        public void writePayload(List<LegacyJobLocation> value, DataOutput dataOut) throws IOException {
-            dataOut.writeInt(value.size());
-            for (LegacyJobLocation jobLocation : value) {
-                jobLocation.writeExternal(dataOut);
-            }
-        }
-    }
-
-    class JobSchedulerMarshaller extends VariableMarshaller<LegacyJobSchedulerImpl> {
-        private final LegacyJobSchedulerStoreImpl store;
-
-        JobSchedulerMarshaller(LegacyJobSchedulerStoreImpl store) {
-            this.store = store;
-        }
-
-        @Override
-        public LegacyJobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
-            LegacyJobSchedulerImpl result = new LegacyJobSchedulerImpl(this.store);
-            result.read(dataIn);
-            return result;
-        }
-
-        @Override
-        public void writePayload(LegacyJobSchedulerImpl js, DataOutput dataOut) throws IOException {
-            js.write(dataOut);
-        }
-    }
-
-    public File getDirectory() {
-        return directory;
-    }
-
-    public void setDirectory(File directory) {
-        this.directory = directory;
-    }
-
-    public long size() {
-        if (!isStarted()) {
-            return 0;
-        }
-        try {
-            return journalSize.get() + pageFile.getDiskSize();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Returns the named Job Scheduler if it exists, otherwise throws an exception.
-     *
-     * @param name
-     *     The name of the scheduler that is to be returned.
-     *
-     * @return the named scheduler if it exists.
-     *
-     * @throws Exception if the named scheduler does not exist in this store.
-     */
-    public LegacyJobSchedulerImpl getJobScheduler(final String name) throws Exception {
-        LegacyJobSchedulerImpl result = this.schedulers.get(name);
-        if (result == null) {
-            throw new NoSuchElementException("No such Job Scheduler in this store: " + name);
-        }
-        return result;
-    }
-
-    /**
-     * Returns the names of all the schedulers that exist in this scheduler store.
-     *
-     * @return a set of names of all scheduler instances in this store.
-     *
-     * @throws Exception if an error occurs while collecting the scheduler names.
-     */
-    public Set<String> getJobSchedulerNames() throws Exception {
-        Set<String> names = Collections.emptySet();
-
-        if (!schedulers.isEmpty()) {
-            return this.schedulers.keySet();
-        }
-
-        return names;
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        if (this.directory == null) {
-            this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
-        }
-        IOHelper.mkdirs(this.directory);
-        lock();
-        this.journal = new Journal();
-        this.journal.setDirectory(directory);
-        this.journal.setMaxFileLength(getJournalMaxFileLength());
-        this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
-        this.journal.setSizeAccumulator(this.journalSize);
-        this.journal.start();
-        this.pageFile = new PageFile(directory, "scheduleDB");
-        this.pageFile.setWriteBatchSize(1);
-        this.pageFile.load();
-
-        this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
-            @Override
-            public void execute(Transaction tx) throws IOException {
-                if (pageFile.getPageCount() == 0) {
-                    Page<MetaData> page = tx.allocate();
-                    assert page.getPageId() == 0;
-                    page.set(metaData);
-                    metaData.page = page;
-                    metaData.createIndexes(tx);
-                    tx.store(metaData.page, metaDataMarshaller, true);
-
-                } else {
-                    Page<MetaData> page = tx.load(0, metaDataMarshaller);
-                    metaData = page.get();
-                    metaData.page = page;
-                }
-                metaData.load(tx);
-                metaData.loadScheduler(tx, schedulers);
-                for (LegacyJobSchedulerImpl js : schedulers.values()) {
-                    try {
-                        js.start();
-                    } catch (Exception e) {
-                        LegacyJobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e);
-                    }
-                }
-            }
-        });
-
-        this.pageFile.flush();
-        LOG.info(this + " started");
-    }
-
-    @Override
-    protected void doStop(ServiceStopper stopper) throws Exception {
-        for (LegacyJobSchedulerImpl js : this.schedulers.values()) {
-            js.stop();
-        }
-        if (this.pageFile != null) {
-            this.pageFile.unload();
-        }
-        if (this.journal != null) {
-            journal.close();
-        }
-        if (this.lockFile != null) {
-            this.lockFile.unlock();
-        }
-        this.lockFile = null;
-        LOG.info(this + " stopped");
-    }
-
-    ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
-        ByteSequence result = null;
-        result = this.journal.read(location);
-        return result;
-    }
-
-    Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
-        return this.journal.write(payload, sync);
-    }
-
-    private void lock() throws IOException {
-        if (lockFile == null) {
-            File lockFileName = new File(directory, "lock");
-            lockFile = new LockFile(lockFileName, true);
-            if (failIfDatabaseIsLocked) {
-                lockFile.lock();
-            } else {
-                while (true) {
-                    try {
-                        lockFile.lock();
-                        break;
-                    } catch (IOException e) {
-                        LOG.info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000)
-                            + " seconds for the database to be unlocked. Reason: " + e);
-                        try {
-                            Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
-                        } catch (InterruptedException e1) {
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    PageFile getPageFile() {
-        this.pageFile.isLoaded();
-        return this.pageFile;
-    }
-
-    public boolean isFailIfDatabaseIsLocked() {
-        return failIfDatabaseIsLocked;
-    }
-
-    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
-        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
-    }
-
-    public int getJournalMaxFileLength() {
-        return journalMaxFileLength;
-    }
-
-    public void setJournalMaxFileLength(int journalMaxFileLength) {
-        this.journalMaxFileLength = journalMaxFileLength;
-    }
-
-    public int getJournalMaxWriteBatchSize() {
-        return journalMaxWriteBatchSize;
-    }
-
-    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
-        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
-    }
-
-    public boolean isEnableIndexWriteAsync() {
-        return enableIndexWriteAsync;
-    }
-
-    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
-        this.enableIndexWriteAsync = enableIndexWriteAsync;
-    }
-
-    @Override
-    public String toString() {
-        return "LegacyJobSchedulerStore:" + this.directory;
-    }
-}


Mime
View raw message