activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r633639 [4/7] - in /activemq/sandbox/activemq-router: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/activemq/ src/main/java/org/apache/activemq/broker/ src/main/java/org/apache/active...
Date Tue, 04 Mar 2008 21:01:57 GMT
Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStore.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStore.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStore.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.journal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.broker.router.api.Destination;
+import org.apache.activemq.broker.router.index.api.DataIndex;
+import org.apache.activemq.broker.router.index.api.IndexEntry;
+import org.apache.activemq.broker.router.index.api.ReferenceIndex;
+import org.apache.activemq.broker.router.store.api.CacheEntry;
+import org.apache.activemq.broker.router.store.api.DataStore;
+import org.apache.activemq.broker.router.store.api.ReferenceStore;
+import org.apache.activemq.broker.router.store.journal.data.AddMessageJournalEntry;
+import org.apache.activemq.broker.router.store.journal.data.Journal;
+import org.apache.activemq.broker.router.store.journal.data.RemoveMessageJournalEntry;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * 
+ * @author chirino
+ */
+public class JournalDataStore extends JournalStore implements DataStore {
+    private final static Log LOG = LogFactory.getLog(JournalDataStore.class);
+
+    private final JournalStoreManager<ReferenceStore> stores = new JournalStoreManager<ReferenceStore>() {
+        @Override
+        protected ReferenceStore createStore(String name) throws Exception {
+            ReferenceIndex refIndex = index.addStore(name);
+            return new JournalReferenceStore(JournalDataStore.this, refIndex);
+        }
+
+        @Override
+        protected void destroyStore(ReferenceStore store) throws Exception {
+            JournalReferenceStore jrs = (JournalReferenceStore) store;
+            index.removeStore(jrs.getIndex());
+        }
+
+        @Override
+        protected HashMap<String, ReferenceStore> createExistingStores() throws Exception {
+            HashMap<String, ReferenceStore> rc = new HashMap<String, ReferenceStore>();
+            List<ReferenceIndex> indexes = index.getStores();
+            for (ReferenceIndex index : indexes) {
+                rc.put(index.getName(), new JournalReferenceStore(JournalDataStore.this, index));
+            }
+            return rc;
+        }
+
+    };
+
+    public class JournalCacheEntry implements CacheEntry {
+
+        public final IndexEntry indexEntry;
+        // Tracks load requests by the broker.. keeps the cache entry in memory
+        private int loadCounter;
+        public Message message;
+        public boolean locked;
+        public boolean dropped;
+        public boolean dirty; // This cache entry could be dirty.
+
+        public JournalCacheEntry(IndexEntry indexEntry, Message message) {
+            this.indexEntry = indexEntry;
+            this.message = message;
+            this.dirty = message == null;
+        }
+
+        synchronized public Message getMessage() {
+            if (message == null) {
+                // Load it on demand..
+                if (dropped) {
+                    return null;
+                }
+                try {
+                    // This may not work since it may have been dropped.
+                    message = journal.loadData(indexEntry.getLocation());
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                    dropped = true;
+                }
+                int size = message.getSize();
+                systemUsage.getMemoryUsage().increaseUsage(size);
+            }
+            return message;
+        }
+
+        public long getId() {
+            return indexEntry.getId();
+        }
+
+        public void incrementRedeliveryCounter() {
+        }
+
+        public DataStore getStore() {
+            return JournalDataStore.this;
+        }
+
+        synchronized public void load() {
+            if (message != null && loadCounter == 0) {
+                int size = message.getSize();
+                systemUsage.getMemoryUsage().increaseUsage(size);
+            }
+            loadCounter++;
+        }
+
+        synchronized public void unload() {
+            loadCounter--;
+            assert loadCounter >= 0;
+            if (loadCounter < 1) {
+                if (message != null) {
+                    systemUsage.getMemoryUsage().decreaseUsage(message.getSize());
+                    message = null;
+                    dirty = true;
+                }
+                if (!locked) {
+                    cache.remove(indexEntry.getId());
+                }
+            }
+        }
+
+        synchronized public boolean lock() {
+            assert loadCounter >= 0; // You should only lock loaded cache
+                                        // entries.
+            if (locked || dropped)
+                return false;
+
+            // locking is used to exclusively consume a message. If we are dirty
+            // then it
+            // means that this cache record is not the original which may have
+            // already been locked
+            // and dropped. Since we do dirty reads from the index, it's
+            // possible we created
+            // this cache entry after that guy was deleted. We need to check the
+            // index to see if
+            // he is still around.
+            if (dirty) {
+                // Verify the record still exists in the index. Now that this
+                // record is in the cache,
+                // we are cleaning up it's state.
+                try {
+                    if (!index.contains(indexEntry.getId())) {
+                        dirty = false;
+                        droppped();
+                        return false;
+                    }
+                    if (getMessage() == null) {
+                        return false;
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            assert message != null : "Once locked the message should not be null.";
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("LOCKED: " + indexEntry.getId());
+            }
+
+            return locked = true;
+        }
+
+        synchronized public void droppped() {
+            dropped = true;
+            locked = false;
+            if (message != null) {
+                systemUsage.getMemoryUsage().decreaseUsage(message.getSize());
+                message = null;
+            }
+            cache.remove(indexEntry.getId());
+        }
+    }
+
+    private final AtomicReference<JournalCacheEntry> lastAddedEntry = new AtomicReference<JournalCacheEntry>();
+
+    ConcurrentHashMap<Long, JournalCacheEntry> cache = new ConcurrentHashMap<Long, JournalCacheEntry>();
+    private SystemUsage systemUsage;
+    private Destination destination;
+    private final DataIndex index;
+
+    private final JournalDataStoreManager dataStoreManager;
+    private Journal journal;
+
+    public JournalDataStore(JournalDataStoreManager dataStoreManager, DataIndex index) throws Exception {
+        super(index);
+        this.dataStoreManager = dataStoreManager;
+        this.journal = dataStoreManager.getJournal();
+        this.index = index;
+
+        IndexEntry entry = index.getLastAddedId();
+        if (entry != null) {
+            this.lastAddedEntry.set(new JournalCacheEntry(entry, null));
+        }
+    }
+
+    private JournalCacheEntry cacheAdd(IndexEntry indexEntry, Message message) {
+        JournalCacheEntry rc = new JournalCacheEntry(indexEntry, message);
+        JournalCacheEntry old = cache.putIfAbsent(indexEntry.getId(), rc);
+        rc = old != null ? old : rc;
+        rc.load();
+        lastAddedEntry.set(rc);
+        return rc;
+    }
+
+    JournalCacheEntry load(IndexEntry indexEntry) throws IllegalStateException, IOException {
+        JournalCacheEntry rc = new JournalCacheEntry(indexEntry, null);
+        JournalCacheEntry old = cache.putIfAbsent(indexEntry.getId(), rc);
+        rc = old != null ? old : rc;
+        rc.load();
+        return rc;
+    }
+
+    public JournalCacheEntry addMessage(long id, Message message, Runnable onCompleted) throws Exception {
+        AddMessageJournalEntry journalRecord = journal.addData(getName(), id, message, null, onCompleted);
+        IndexEntry indexEntry = index.addMessage(id, journalRecord.getLocation());
+        JournalCacheEntry rc = cacheAdd(indexEntry, message);
+        return rc;
+    }
+
+    public AddMessageTransactionAction addTransactedMessage(long id, Message message, TransactionId tx, Runnable onCompleted) throws Exception {
+        final AddMessageJournalEntry rc = journal.addData(getName(), id, message, tx, onCompleted);
+        return createAddMessageTransactionAction(rc);
+    }
+
+    public AddMessageTransactionAction createAddMessageTransactionAction(final AddMessageJournalEntry rc) {
+        return new AddMessageTransactionAction() {
+            public CacheEntry complete() throws Exception {
+                IndexEntry indexEntry = index.addMessage(rc.getId(), rc.getLocation());
+                return cacheAdd(indexEntry, rc.getMessage());
+            }
+
+            public Message getMessage() {
+                return rc.getMessage();
+            }
+
+            public DataStore getDataStore() {
+                return JournalDataStore.this;
+            }
+
+            public TransactionId getTransactionId() {
+                return rc.getTransactionId();
+            }
+        };
+    }
+
+    synchronized public List<CacheEntry> load(CacheEntry firstCE, CacheEntry lastCE, int max) throws Exception {
+        JournalCacheEntry first = (JournalCacheEntry) firstCE;
+        JournalCacheEntry last = (JournalCacheEntry) lastCE;
+
+        IndexEntry firstIE = first == null ? null : first.indexEntry;
+        IndexEntry lastIE = last == null ? null : last.indexEntry;
+
+        List<IndexEntry> load = index.load(firstIE, lastIE, max);
+        ArrayList<CacheEntry> rc = new ArrayList<CacheEntry>(load.size());
+        for (IndexEntry ie : load) {
+            CacheEntry cacheEntry = load(ie);
+            rc.add(cacheEntry);
+        }
+
+        return rc;
+    }
+
+    public void remove(long id, Runnable onCompleted) throws Exception {
+        journal.removeData(getName(), id, null, onCompleted);
+        index.remove(id);
+        JournalCacheEntry record = cache.get(id);
+        if (record != null) {
+            record.droppped();
+        }
+    }
+
+    public RemoveMesageTransactionAction removeTransacted(long id, TransactionId tx, Runnable onCompleted) throws Exception {
+        final RemoveMessageJournalEntry removeData = journal.removeData(getName(), id, tx, onCompleted);
+        return createRemoveMesageTransactionAction(removeData);
+    }
+
+    public RemoveMesageTransactionAction createRemoveMesageTransactionAction(final RemoveMessageJournalEntry removeData) {
+        return new RemoveMesageTransactionAction() {
+            public void complete() throws Exception {
+                index.remove(removeData.getId());
+            }
+
+            public long getId() {
+                return removeData.getId();
+            }
+
+            public DataStore getDataStore() {
+                return JournalDataStore.this;
+            }
+
+            public TransactionId getTransactionId() {
+                return removeData.getTransactionId();
+            }
+        };
+    }
+
+    public void removeUnreferencedRecords(CacheEntry until) throws Exception {
+        index.removeUnreferencedRecords(((JournalCacheEntry) until).indexEntry);
+    }
+
+    public long size() throws Exception {
+        return index.size();
+    }
+
+    public CacheEntry getLastAddedEntry() throws Exception {
+        return lastAddedEntry.get();
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(Destination destination) {
+        this.destination = destination;
+        this.systemUsage = destination.getSystemUsage();
+    }
+
+    public DataIndex getIndex() {
+        return index;
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // StoreManager related methods
+    // /////////////////////////////////////////////////////////////////
+    public ReferenceStore addStore(String name) throws Exception {
+        return stores.addStore(name);
+    }
+
+    public ReferenceStore getStore(String name) throws Exception {
+        return stores.getStore(name);
+    }
+
+    public List<ReferenceStore> getStores() throws Exception {
+        return stores.getStores();
+    }
+
+    public void removeStore(ReferenceStore store) throws Exception {
+        stores.removeStore(store);
+    }
+
+    public void setAutoRemove(boolean enable) {
+    }
+
+    public JournalDataStoreManager getDataStoreManager() {
+        return dataStoreManager;
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStoreManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStoreManager.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStoreManager.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStoreManager.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.journal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.broker.router.index.api.DataIndex;
+import org.apache.activemq.broker.router.index.api.DataIndexManager;
+import org.apache.activemq.broker.router.store.api.DataStore;
+import org.apache.activemq.broker.router.store.api.DataStoreManager;
+import org.apache.activemq.broker.router.store.journal.data.AddMessageJournalEntry;
+import org.apache.activemq.broker.router.store.journal.data.Journal;
+import org.apache.activemq.broker.router.store.journal.data.JournalEntry;
+import org.apache.activemq.broker.router.store.journal.data.RemoveMessageJournalEntry;
+import org.apache.activemq.broker.router.store.journal.data.RemoveReferenceJournalEntry;
+import org.apache.activemq.broker.router.store.journal.data.StoreTxJournalEntry;
+import org.apache.activemq.broker.router.store.journal.data.TxJournalEntry;
+import org.apache.activemq.command.JournalTransaction;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * 
+ * @author chirino
+ */
+public class JournalDataStoreManager implements DataStoreManager {
+
+    private static final Log LOG = LogFactory.getLog(JournalDataStoreManager.class);
+
+    private final JournalStoreManager<DataStore> stores = new JournalStoreManager<DataStore>() {
+        @Override
+        protected DataStore createStore(String name) throws Exception {
+            DataIndex index = dataIndexManager.addStore(name);
+            return new JournalDataStore(JournalDataStoreManager.this, index);
+        }
+
+        @Override
+        protected void destroyStore(DataStore store) throws Exception {
+            JournalDataStore jds = (JournalDataStore) store;
+            dataIndexManager.removeStore(jds.getIndex());
+        }
+
+        @Override
+        protected HashMap<String, DataStore> createExistingStores() throws Exception {
+            HashMap<String, DataStore> rc = new HashMap<String, DataStore>();
+            List<DataIndex> indexes = dataIndexManager.getStores();
+            for (DataIndex index : indexes) {
+                rc.put(index.getName(), new JournalDataStore(JournalDataStoreManager.this, index));
+            }
+            return rc;
+        }
+    };
+
+    private WireFormat wireFormat;
+    private AsyncDataManager asyncDataManager;
+    private DataIndexManager dataIndexManager;
+    private boolean deleteAllMessages;
+    private Journal journal;
+    private Set<TransactionId> preparedTransactions = new HashSet<TransactionId>();
+    private Map<TransactionId, List<StoreTxJournalEntry>> inFlightTransactions = new HashMap<TransactionId, List<StoreTxJournalEntry>>();
+    private Timer cleanup = new Timer();
+    long cleanupInterval = 1000 * 60;
+
+    public void start() throws Exception {
+
+        if (wireFormat == null) {
+            throw new IllegalStateException("The wireFormat property must be specified.");
+        }
+        if (asyncDataManager == null) {
+            throw new IllegalStateException("The asyncDataManager property must be specified.");
+        }
+        if (asyncDataManager == null) {
+            throw new IllegalStateException("The dataIndexManager property must be specified.");
+        }
+
+        LOG.info("Journal starting");
+        asyncDataManager.start();
+        dataIndexManager.start();
+
+        journal = new Journal(asyncDataManager, wireFormat);
+        if (deleteAllMessages) {
+            LOG.info("Deleting existing messages data..");
+            journal.trace("DELETING " + new Date());
+            dataIndexManager.clear();
+            asyncDataManager.delete();
+            journal.trace("DELETED " + new Date());
+            deleteAllMessages = false;
+        }
+
+        recover();
+        LOG.info("Journal started");
+
+        cleanup = new Timer("Journal Cleanup");
+        cleanup.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                cleanup();
+            }
+        }, 1, cleanupInterval);
+    }
+
+    protected void cleanup() {
+        try {
+            // Track that last data file we are using right now..
+            Integer last = asyncDataManager.getCurrentDataFileId();
+            // Getting all the files in use could take a while and we might
+            // rotate to a new
+            // data file not included in the inUse. That why we track last.
+            Set<Integer> inUse = dataIndexManager.getDataFileIdsInUse();
+            // TODO: add all file id's of operations in txs..they will not be
+            // tracked by teh index manager
+            // deletes all data files that are not in use and come before that
+            // last data file id.
+            asyncDataManager.consolidateDataFilesNotIn(inUse, last);
+        } catch (Exception e) {
+            LOG.error("Could not cleanup data files: " + e, e);
+        }
+    }
+
+    public void stop() throws Exception {
+        cleanup.purge();
+        cleanup.cancel();
+
+        dataIndexManager.sync();
+        dataIndexManager.stop();
+        journal.trace("STOPPED " + new Date());
+        journal.synch();
+        asyncDataManager.close();
+        journal = null;
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // StoreManager related methods
+    // /////////////////////////////////////////////////////////////////
+
+    public DataStore addStore(String name) throws Exception {
+        return stores.addStore(name);
+    }
+
+    public DataStore getStore(String name) throws Exception {
+        return stores.getStore(name);
+    }
+
+    public List<DataStore> getStores() throws Exception {
+        return stores.getStores();
+    }
+
+    public void removeStore(DataStore store) throws Exception {
+        stores.removeStore(store);
+    }
+
+    // /////////////////////////////////////////////////////////
+    // Journal Methods
+    // /////////////////////////////////////////////////////////
+
+    public AsyncDataManager getAsyncDataManager() {
+        return asyncDataManager;
+    }
+
+    public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
+        this.asyncDataManager = asyncDataManager;
+    }
+
+    public DataIndexManager getDataIndexManager() {
+        return dataIndexManager;
+    }
+
+    public void setDataIndexManager(DataIndexManager dataIndexManager) {
+        this.dataIndexManager = dataIndexManager;
+    }
+
+    public boolean isDeleteAllMessages() {
+        return deleteAllMessages;
+    }
+
+    public void setDeleteAllMessages(boolean deleteAllMessages) {
+        this.deleteAllMessages = deleteAllMessages;
+    }
+
+    private void addTrasactionOp(TransactionId tx, StoreTxJournalEntry entry) {
+        List<StoreTxJournalEntry> list = inFlightTransactions.get(tx);
+        if (list == null) {
+            list = new ArrayList<StoreTxJournalEntry>();
+            inFlightTransactions.put(tx, list);
+        }
+        list.add(entry);
+    }
+
+    public Map<TransactionId, List<TransactionAction>> recoverPendingTransactions() throws Exception {
+        Map<TransactionId, List<TransactionAction>> rc = new HashMap<TransactionId, List<TransactionAction>>();
+        for (TransactionId transactionId : preparedTransactions) {
+            List<StoreTxJournalEntry> journalEntries = inFlightTransactions.get(transactionId);
+            List<TransactionAction> actions = new ArrayList<TransactionAction>(journalEntries.size());
+            rc.put(transactionId, actions);
+            for (StoreTxJournalEntry entry : journalEntries) {
+                if (entry.getClass() == AddMessageJournalEntry.class) {
+                    final AddMessageJournalEntry je = (AddMessageJournalEntry) entry;
+                    JournalDataStore dataStore = (JournalDataStore) getStore(entry.getDatatStore());
+                    actions.add(dataStore.createAddMessageTransactionAction(je));
+                } else if (entry.getClass() == RemoveMessageJournalEntry.class) {
+                    final RemoveMessageJournalEntry je = (RemoveMessageJournalEntry) entry;
+                    JournalDataStore dataStore = (JournalDataStore) getStore(entry.getDatatStore());
+                    actions.add(dataStore.createRemoveMesageTransactionAction(je));
+                } else if (entry.getClass() == RemoveReferenceJournalEntry.class) {
+                    final RemoveReferenceJournalEntry je = (RemoveReferenceJournalEntry) entry;
+                    JournalDataStore dataStore = (JournalDataStore) getStore(entry.getDatatStore());
+                    JournalReferenceStore referenceStore = (JournalReferenceStore) dataStore.getStore(je.getReferenceStore());
+                    actions.add(referenceStore.createRemoveReferenceTransactionAction(je));
+                }
+            }
+        }
+        return rc;
+    }
+
+    public void recover() throws Exception {
+
+        final Object waitMutex = new Object() {
+        };
+        final AtomicBoolean done = new AtomicBoolean(false);
+        final AtomicInteger redoCounter = new AtomicInteger();
+        final AtomicLong redoTime = new AtomicLong();
+        final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+        final AtomicReference<Location> pos = new AtomicReference<Location>();
+
+        LOG.info("Journal recovery starting");
+        final Location startLocation = dataIndexManager.getLastAddLocation();
+        if (startLocation == null) {
+            LOG.debug("  Replay start position is from the beginging of the journal.");
+        } else {
+            LOG.debug("  Replay start position is from file " + startLocation.getDataFileId() + " at offset " + startLocation.getOffset() + " in the journal");
+        }
+
+        pos.set(startLocation);
+        long endDiskLocation = asyncDataManager.getDiskSize();
+        long startDiskLocation = 0;
+        if (startLocation != null) {
+            startDiskLocation = asyncDataManager.getDiskSizeUntil(startLocation);
+        }
+
+        // Run recovery in an async thread so that we can monitor it's progress.
+        Thread recoveryThread = new Thread("Recovery Thread") {
+            @Override
+            public void run() {
+                try {
+                    doRecover();
+                } catch (Exception e) {
+                    error.set(e);
+                } finally {
+                    synchronized (waitMutex) {
+                        done.set(true);
+                        waitMutex.notify();
+                    }
+                }
+            }
+
+            public void doRecover() throws Exception {
+                JournalEntry journalEntry;
+                while ((journalEntry = journal.getNextEntry(pos.get())) != null) {
+                    pos.set(journalEntry.getLocation());
+                    if (journalEntry instanceof StoreTxJournalEntry) {
+                        StoreTxJournalEntry entry = (StoreTxJournalEntry) journalEntry;
+                        if (entry.getTransactionId() != null) {
+                            addTrasactionOp(entry.getTransactionId(), entry);
+                        } else {
+                            redo(entry);
+                        }
+                    } else if (journalEntry.getClass() == TxJournalEntry.class) {
+                        TxJournalEntry entry = (TxJournalEntry) journalEntry;
+                        TransactionInfo command = entry.getTransaction();
+
+                        switch (command.getType()) {
+                        case JournalTransaction.XA_PREPARE:
+                            // Keep track that this was a prepared transaction.
+                            preparedTransactions.add(command.getTransactionId());
+                            break;
+                        case JournalTransaction.XA_COMMIT:
+                        case JournalTransaction.LOCAL_COMMIT:
+                            // Redo the operations against the indexes.
+                            preparedTransactions.remove(command.getTransactionId());
+                            List<StoreTxJournalEntry> operations = inFlightTransactions.remove(command.getTransactionId());
+                            if (operations != null) {
+                                for (StoreTxJournalEntry op : operations) {
+                                    redo(op);
+                                }
+                            }
+                            break;
+
+                        case JournalTransaction.LOCAL_ROLLBACK:
+                        case JournalTransaction.XA_ROLLBACK:
+                            preparedTransactions.remove(command.getTransactionId());
+                            inFlightTransactions.remove(command.getTransactionId());
+                            break;
+                        default:
+                            throw new IOException("Invalid journal TransactionInfo type: " + command.getType());
+                        }
+                    }
+                }
+
+                // Now get rid of any transactions that were not prepared.
+                for (Iterator<TransactionId> iterator = inFlightTransactions.keySet().iterator(); iterator.hasNext();) {
+                    TransactionId tx = iterator.next();
+                    if (!preparedTransactions.contains(tx)) {
+                        // Transaction was not prepared.. rollback.
+                        iterator.remove();
+                    }
+                }
+
+                // Record that the journal was recovered.
+                journal.trace("RECOVERED " + new Date());
+                journal.synch();
+            }
+
+            private void redo(StoreTxJournalEntry journalEntry) throws Exception {
+                JournalDataStore store = (JournalDataStore) getStore(journalEntry.getDatatStore());
+                if (store != null) {
+                    if (journalEntry.getClass() == AddMessageJournalEntry.class) {
+                        AddMessageJournalEntry entry = (AddMessageJournalEntry) journalEntry;
+                        long start = System.currentTimeMillis();
+                        store.getIndex().redoAddMessage(entry.getId(), entry.getLocation());
+                        long end = System.currentTimeMillis();
+                        redoTime.addAndGet(end - start);
+                        redoCounter.incrementAndGet();
+                    } else if (journalEntry.getClass() == RemoveMessageJournalEntry.class) {
+                        RemoveMessageJournalEntry entry = (RemoveMessageJournalEntry) journalEntry;
+                        long start = System.currentTimeMillis();
+                        store.getIndex().redoRemove(entry.getId());
+                        long end = System.currentTimeMillis();
+                        redoTime.addAndGet(end - start);
+                        redoCounter.incrementAndGet();
+                    } else if (journalEntry.getClass() == RemoveReferenceJournalEntry.class) {
+                        RemoveReferenceJournalEntry entry = (RemoveReferenceJournalEntry) journalEntry;
+                        JournalReferenceStore refStore = (JournalReferenceStore) store.getStore(entry.getReferenceStore());
+                        if (refStore != null) {
+                            long start = System.currentTimeMillis();
+                            refStore.getIndex().redoRemove(entry.getId());
+                            long end = System.currentTimeMillis();
+                            redoTime.addAndGet(end - start);
+                            redoCounter.incrementAndGet();
+                        }
+                    }
+                }
+            }
+
+        };
+
+        recoveryThread.start();
+
+        int lastRedoCounter = 0;
+        long lastRedoTime = 0;
+        long totalDiskToScan = endDiskLocation - startDiskLocation;
+
+        while (true) {
+            synchronized (waitMutex) {
+                if (done.get()) {
+                    break;
+                }
+                waitMutex.wait(1000);
+                if (done.get()) {
+                    break;
+                }
+            }
+
+            long currentDiskLocation = asyncDataManager.getDiskSizeUntil(pos.get());
+            long amountScanned = currentDiskLocation - startDiskLocation;
+
+            float percentDone = 100.0f * amountScanned / totalDiskToScan;
+
+            int r = redoCounter.get();
+            long t = redoTime.get();
+            int redoDiff = r - lastRedoCounter;
+            long redoTimeDiff = t - lastRedoTime;
+            lastRedoCounter = r;
+            lastRedoTime = t;
+
+            if (redoDiff != 0 && redoTimeDiff != 0) {
+                float rate = 1000.0f * redoDiff / redoTimeDiff;
+                LOG.info("  " + percentDone + "% done: " + lastRedoCounter + " operations replayed.  Replay rate: " + rate + " operations/s");
+            }
+        }
+
+        recoveryThread.join();
+        if (error.get() != null) {
+            LOG.info("Recovery failed: " + error.get());
+            throw new Exception("Recovery Failed: " + error.get().getMessage(), error.get());
+        }
+
+        lastRedoCounter = redoCounter.get();
+        lastRedoTime = redoTime.get();
+        if (lastRedoCounter != 0 && lastRedoTime != 0) {
+            float rate = 1000.0f * lastRedoCounter / lastRedoTime;
+            LOG.info("  100% done: " + lastRedoCounter + " operations replayed.  Replay rate: " + rate + " operations/s");
+        }
+    }
+
+    public WireFormat getWireFormat() {
+        return wireFormat;
+    }
+
+    public void setWireFormat(WireFormat wireFormat) {
+        this.wireFormat = wireFormat;
+    }
+
+    public Journal getJournal() {
+        return journal;
+    }
+
+    public void record(TransactionInfo txOperation, Runnable onComplete) throws Exception {
+        journal.record(txOperation, onComplete);
+    }
+
+    public long getCleanupInterval() {
+        return cleanupInterval;
+    }
+
+    public void setCleanupInterval(long cleanupInterval) {
+        this.cleanupInterval = cleanupInterval;
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStoreManagerFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStoreManagerFactory.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStoreManagerFactory.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStoreManagerFactory.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.journal;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.broker.router.index.api.DataIndexManager;
+import org.apache.activemq.broker.router.index.jpa.JpaDataIndexManager;
+import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.wireformat.WireFormat;
+import org.springframework.beans.factory.FactoryBean;
+
+/**
+ * 
+ * @author chirino
+ */
+public class JournalDataStoreManagerFactory implements FactoryBean {
+
+    private AtomicLong storeSize = new AtomicLong(1024 * 1024 * 32);
+    private boolean archiveDataLogs = false;
+    private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
+    private boolean useNio = false;
+    private WireFormat wireFormat;
+    private DataIndexManager dataIndexManager;
+    private AsyncDataManager asyncDataManager;
+    private boolean deleteAllMessages;
+    private File dataDirectory;
+    private File archiveDirectory;
+    private File journalDirectory;
+    private File indexDatabase;
+    private Properties entityManagerProperties;
+
+    public Object getObject() throws Exception {
+        return createJournalDataStoreManager();
+    }
+
+    public JournalDataStoreManager createJournalDataStoreManager() throws Exception {
+        getJournalDirectory().mkdirs();
+        if (archiveDataLogs) {
+            getArchiveDirectory().mkdirs();
+        }
+
+        JournalDataStoreManager rc = new JournalDataStoreManager();
+        rc.setAsyncDataManager(getAsyncDataManager());
+        rc.setDataIndexManager(getDataIndexManager());
+        rc.setWireFormat(getWireFormat());
+        rc.setDeleteAllMessages(isDeleteAllMessages());
+        return rc;
+    }
+
+    public DataIndexManager createDataIndexManager() throws IOException {
+        JpaDataIndexManager rc = new JpaDataIndexManager();
+        rc.setEntityManagerProperties(getEntityManagerProperties());
+        return rc;
+    }
+
+    public WireFormat createWireFormat() {
+        OpenWireFormat owf = new OpenWireFormat();
+        owf.setCacheEnabled(false);
+        owf.setTightEncodingEnabled(false);
+        owf.setSizePrefixDisabled(true);
+        return owf;
+    }
+
+    public AsyncDataManager createAsyncDataManager() {
+        AsyncDataManager manager = new AsyncDataManager(getStoreSize());
+        manager.setDirectory(getJournalDirectory());
+        manager.setDirectoryArchive(getArchiveDirectory());
+        manager.setArchiveDataLogs(isArchiveDataLogs());
+        manager.setMaxFileLength(getMaxFileLength());
+        manager.setUseNio(isUseNio());
+        return manager;
+    }
+
+    public Class<JournalDataStoreManager> getObjectType() {
+        return JournalDataStoreManager.class;
+    }
+
+    public boolean isSingleton() {
+        return false;
+    }
+
+    public AtomicLong getStoreSize() {
+        return storeSize;
+    }
+
+    public void setStoreSize(AtomicLong storeSize) {
+        this.storeSize = storeSize;
+    }
+
+    public boolean isArchiveDataLogs() {
+        return archiveDataLogs;
+    }
+
+    public void setArchiveDataLogs(boolean archiveDataLogs) {
+        this.archiveDataLogs = archiveDataLogs;
+    }
+
+    public int getMaxFileLength() {
+        return maxFileLength;
+    }
+
+    public void setMaxFileLength(int maxFileLength) {
+        this.maxFileLength = maxFileLength;
+    }
+
+    public boolean isUseNio() {
+        return useNio;
+    }
+
+    public void setUseNio(boolean useNio) {
+        this.useNio = useNio;
+    }
+
+    public WireFormat getWireFormat() {
+        if (wireFormat == null) {
+            wireFormat = createWireFormat();
+        }
+        return wireFormat;
+    }
+
+    public void setWireFormat(WireFormat wireFormat) {
+        this.wireFormat = wireFormat;
+    }
+
+    public DataIndexManager getDataIndexManager() throws IOException {
+        if (dataIndexManager == null) {
+            dataIndexManager = createDataIndexManager();
+        }
+        return dataIndexManager;
+    }
+
+    public void setDataIndexManager(DataIndexManager dataIndexManager) {
+        this.dataIndexManager = dataIndexManager;
+    }
+
+    public AsyncDataManager getAsyncDataManager() {
+        if (asyncDataManager == null) {
+            asyncDataManager = createAsyncDataManager();
+        }
+        return asyncDataManager;
+    }
+
+    public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
+        this.asyncDataManager = asyncDataManager;
+    }
+
+    public boolean isDeleteAllMessages() {
+        return deleteAllMessages;
+    }
+
+    public void setDeleteAllMessages(boolean deleteAllMessages) {
+        this.deleteAllMessages = deleteAllMessages;
+    }
+
+    public File getDataDirectory() {
+        if (dataDirectory == null) {
+            dataDirectory = new File(IOHelper.getDefaultDataDirectory());
+        }
+        return dataDirectory;
+    }
+
+    public void setDataDirectory(File dataDirectory) {
+        this.dataDirectory = dataDirectory;
+    }
+
+    public File getJournalDirectory() {
+        if (journalDirectory == null) {
+            journalDirectory = new File(getDataDirectory(), "journal");
+        }
+        return journalDirectory;
+    }
+
+    public void setJournalDirectory(File journalDirectory) {
+        this.journalDirectory = journalDirectory;
+    }
+
+    public File getArchiveDirectory() {
+        if (this.archiveDirectory == null) {
+            this.archiveDirectory = new File(getDataDirectory(), "archive");
+        }
+        return archiveDirectory;
+    }
+
+    public void setArchiveDirectory(File directoryArchive) {
+        this.archiveDirectory = directoryArchive;
+    }
+
+    public File getIndexDatabase() {
+        if (indexDatabase == null) {
+            indexDatabase = new File(new File(getDataDirectory(), "index"), "hsqldb");
+        }
+        return indexDatabase;
+    }
+
+    public void setIndexDatabase(File indexDatabase) {
+        this.indexDatabase = indexDatabase;
+    }
+
+    public Properties getEntityManagerProperties() throws IOException {
+        if (entityManagerProperties == null) {
+
+            // Settings for H2
+            entityManagerProperties = new Properties();
+            entityManagerProperties.put("openjpa.ConnectionDriverName", "org.apache.commons.dbcp.BasicDataSource");
+            entityManagerProperties.put("openjpa.ConnectionProperties", "DriverClassName=org.h2.Driver," + "Url=jdbc:h2:file:" + getIndexDatabase().getCanonicalPath() + ";LOCK_MODE=0,LOG=2,"
+                    + "MaxActive=5000,MaxWait=1000,TestOnBorrow=true");
+            // entityManagerProperties.put("openjpa.ConnectionProperties",
+            // "DriverClassName=org.h2.Driver," +
+            // "Url=jdbc:h2:file:"+getIndexDatabase().getCanonicalPath()+";LOCK_MODE=0,LOG=2"+
+            // "MaxActive=5000,MaxWait=1000,TestOnBorrow=true"
+            // );
+            entityManagerProperties.put("openjpa.jdbc.DBDictionary", "org.apache.activemq.broker.router.store.journal.MyH2Dictionary");
+
+            // Settings for HSQL
+            // entityManagerProperties.put("openjpa.ConnectionDriverName",
+            // "org.apache.commons.dbcp.BasicDataSource");
+            // entityManagerProperties.put("openjpa.ConnectionProperties",
+            // "DriverClassName=org.hsqldb.jdbcDriver," +
+            // "Url=jdbc:hsqldb:file:"+getIndexDatabase().getCanonicalPath()+","+
+            // "Username=sa,Password=,"+
+            // "MaxActive=5000,MaxWait=1000,TestOnBorrow=false"
+            // );
+            // entityManagerProperties.put("openjpa.jdbc.DBDictionary",
+            // "hsql(CacheTables=true)");
+
+            // entityManagerProperties.put("openjpa.jdbc.Schema", "AMQ_");
+            // entityManagerProperties.put("openjpa.Log",
+            // "DefaultLevel=WARN,SQL=TRACE");
+            entityManagerProperties.put("openjpa.Log", "DefaultLevel=ERROR");
+            entityManagerProperties.put("openjpa.jdbc.SynchronizeMappings", "buildSchema");
+
+        }
+        return entityManagerProperties;
+    }
+
+    public void setEntityManagerProperties(Properties entityManagerProperties) {
+        this.entityManagerProperties = entityManagerProperties;
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalReferenceStore.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalReferenceStore.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalReferenceStore.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.journal;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.broker.router.index.api.IndexEntry;
+import org.apache.activemq.broker.router.index.api.ReferenceIndex;
+import org.apache.activemq.broker.router.store.api.CacheEntry;
+import org.apache.activemq.broker.router.store.api.DataStore;
+import org.apache.activemq.broker.router.store.api.ReferenceStore;
+import org.apache.activemq.broker.router.store.journal.JournalDataStore.JournalCacheEntry;
+import org.apache.activemq.broker.router.store.journal.data.Journal;
+import org.apache.activemq.broker.router.store.journal.data.RemoveReferenceJournalEntry;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.TransactionId;
+
+/**
+ * 
+ * @author chirino
+ */
+public class JournalReferenceStore extends JournalStore implements ReferenceStore {
+
+    private final JournalDataStore dataStore;
+    private final ReferenceIndex index;
+    private Journal journal;
+
+    class ReferenceCacheEntry implements CacheEntry {
+        final JournalCacheEntry cacheEntry;
+        final IndexEntry indexEntry;
+
+        public ReferenceCacheEntry(JournalCacheEntry cacheEntry, IndexEntry indexEntry) {
+            this.cacheEntry = cacheEntry;
+            this.indexEntry = indexEntry;
+        }
+
+        public long getId() {
+            return indexEntry.getId();
+        }
+
+        public Message getMessage() {
+            return cacheEntry.getMessage();
+        }
+
+        public DataStore getStore() {
+            return dataStore;
+        }
+
+        public void incrementRedeliveryCounter() {
+            cacheEntry.incrementRedeliveryCounter();
+        }
+
+        public void load() {
+            cacheEntry.load();
+        }
+
+        public boolean lock() {
+            return cacheEntry.lock();
+        }
+
+        public void unload() {
+            cacheEntry.unload();
+        }
+
+    }
+
+    public JournalReferenceStore(JournalDataStore dataStore, ReferenceIndex index) {
+        super(index);
+        this.dataStore = dataStore;
+        this.index = index;
+        this.journal = this.dataStore.getDataStoreManager().getJournal();
+    }
+
+    public void addReference(CacheEntry cacheEntry) throws Exception {
+        JournalCacheEntry ce = (JournalCacheEntry) cacheEntry;
+        index.addReference(ce.indexEntry);
+    }
+
+    synchronized public List<CacheEntry> load(CacheEntry firstCE, CacheEntry lastCE, int max) throws Exception {
+
+        ReferenceCacheEntry first = (ReferenceCacheEntry) firstCE;
+        ReferenceCacheEntry last = (ReferenceCacheEntry) lastCE;
+
+        IndexEntry firstIE = first == null ? null : first.indexEntry;
+        IndexEntry lastIE = last == null ? null : last.indexEntry;
+
+        ArrayList<CacheEntry> rc = new ArrayList<CacheEntry>(max);
+
+        // We have to keep looping to do a max load cause we may fail to load
+        // some items
+        // reported by the index since folks could be concurrently removing
+        // them.
+        while (rc.size() < max) {
+            List<IndexEntry> load = index.load(firstIE, lastIE, max);
+            for (IndexEntry ie : load) {
+                // don't load any more than max items.
+                if (rc.size() >= max) {
+                    return rc;
+                }
+                JournalCacheEntry cacheEntry = dataStore.load(ie);
+                if (cacheEntry != null) {
+                    ReferenceCacheEntry record = new ReferenceCacheEntry(cacheEntry, ie);
+                    rc.add(record);
+                }
+
+                // Keeps track of where the next index load starts from.
+                firstIE = ie;
+            }
+            // We got a short load from the index.. it won't have any more
+            // items.
+            if (load.size() < max) {
+                break;
+            }
+        }
+
+        return rc;
+    }
+
+    public List<CacheEntry> remove(CacheEntry firstCE, CacheEntry lastCE, int max) throws Exception {
+        ReferenceCacheEntry first = (ReferenceCacheEntry) firstCE;
+        ReferenceCacheEntry last = (ReferenceCacheEntry) lastCE;
+
+        IndexEntry firstIE = first == null ? null : first.indexEntry;
+        IndexEntry lastIE = last == null ? null : last.indexEntry;
+
+        // TODO: we may need to loop since we could get a list < max due to
+        // expired references.
+        List<IndexEntry> load = index.remove(firstIE, lastIE, max);
+        ArrayList<CacheEntry> rc = new ArrayList<CacheEntry>(load.size());
+        for (IndexEntry indexEntry : load) {
+            JournalCacheEntry cacheEntry = dataStore.load(indexEntry);
+            ReferenceCacheEntry record = new ReferenceCacheEntry(cacheEntry, indexEntry);
+            if (record != null) {
+                rc.add(record);
+
+            }
+        }
+        return rc;
+    }
+
+    public long size() throws Exception {
+        return index.size();
+    }
+
+    public CacheEntry getLastAddedEntry() throws Exception {
+        IndexEntry indexEntry = index.getLastAddedId();
+        if (indexEntry == null) {
+            return null;
+        }
+        return new ReferenceCacheEntry(null, indexEntry);
+    }
+
+    public void remove(long id, Runnable onCompleted) throws Exception {
+        index.remove(id);
+    }
+
+    public RemoveReferenceTransactionAction removeReferenceTransacted(long id, TransactionId tx, Runnable onCompleted) throws Exception {
+        final RemoveReferenceJournalEntry rc = journal.removeReference(dataStore.getName(), getName(), id, tx, onCompleted);
+        return createRemoveReferenceTransactionAction(rc);
+    }
+
+    public RemoveReferenceTransactionAction createRemoveReferenceTransactionAction(final RemoveReferenceJournalEntry rc) {
+        return new RemoveReferenceTransactionAction() {
+            public void complete() throws Exception {
+                index.remove(rc.getId());
+            }
+
+            public long getId() {
+                return rc.getId();
+            }
+
+            public ReferenceStore getReferenceStore() {
+                return JournalReferenceStore.this;
+            }
+
+            public DataStore getDataStore() {
+                return dataStore;
+            }
+
+            public TransactionId getTransactionId() {
+                return rc.getTransactionId();
+            }
+        };
+    }
+
+    public ReferenceIndex getIndex() {
+        return index;
+    }
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalStore.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalStore.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalStore.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.journal;
+
+import java.util.Map;
+
+import org.apache.activemq.broker.router.index.api.Index;
+import org.apache.activemq.broker.router.store.api.Store;
+
+/**
+ * 
+ * @author chirino
+ */
+abstract public class JournalStore implements Store {
+
+    private final Index index;
+
+    public JournalStore(Index index) {
+        this.index = index;
+    }
+
+    public String getName() {
+        return index.getName();
+    }
+
+    public void setProperties(Map<String, String> properties) throws Exception {
+        index.setProperties(properties);
+    }
+
+    public Map<String, String> getProperties() throws Exception {
+        return index.getProperties();
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalStoreManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalStoreManager.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalStoreManager.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalStoreManager.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.journal;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.activemq.broker.router.store.api.Store;
+import org.apache.activemq.broker.router.store.api.StoreManager;
+
+/**
+ * 
+ * @author chirino
+ */
+abstract public class JournalStoreManager<T extends Store> implements StoreManager<T> {
+
+    private HashMap<String, T> stores;
+
+    synchronized public T addStore(String name) throws Exception {
+        if (stores == null) {
+            stores = createExistingStores();
+        }
+        T rc = stores.get(name);
+        if (rc == null) {
+            rc = createStore(name);
+            stores.put(name, rc);
+        }
+        return rc;
+    }
+
+    abstract protected T createStore(String name) throws Exception;
+
+    abstract protected void destroyStore(T store) throws Exception;
+
+    abstract protected HashMap<String, T> createExistingStores() throws Exception;
+
+    synchronized public T getStore(String name) throws Exception {
+        if (stores == null) {
+            stores = createExistingStores();
+        }
+        return stores.get(name);
+    }
+
+    synchronized public List<T> getStores() throws Exception {
+        if (stores == null) {
+            stores = createExistingStores();
+        }
+        ArrayList<T> rc = new ArrayList<T>();
+        rc.addAll(stores.values());
+        return rc;
+    }
+
+    synchronized public void removeStore(T store) throws Exception {
+        if (stores == null) {
+            stores = createExistingStores();
+        }
+        stores.remove(store.getName());
+        destroyStore(store);
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/MyH2Dictionary.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/MyH2Dictionary.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/MyH2Dictionary.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/MyH2Dictionary.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.journal;
+
+import org.apache.openjpa.jdbc.sql.H2Dictionary;
+
+/**
+ * The default H2Dictionary fails to work with the latest H2.. This implementation hacks in a fix. 
+ * 
+ * @author chirino
+ */
+public class MyH2Dictionary extends H2Dictionary {
+
+    @Override
+    protected String getSequencesSQL(String schemaName, String sequenceName) {
+        return null;
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/AddMessageJournalEntry.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/AddMessageJournalEntry.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/AddMessageJournalEntry.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/AddMessageJournalEntry.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.journal.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.TransactionId;
+
+/**
+ * 
+ * @author chirino
+ */
+public class AddMessageJournalEntry extends StoreTxJournalEntry {
+
+    private Message message;
+    private long id;
+
+    public AddMessageJournalEntry() {
+    }
+
+    public AddMessageJournalEntry(String dataStore, TransactionId tx, long id, Message message) {
+        super(dataStore, tx);
+        this.id = id;
+        this.message = message;
+    }
+
+    public Message getMessage() {
+        return message;
+    }
+
+    public void setMessage(Message message) {
+        this.message = message;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    @Override
+    public byte getType() {
+        return Journal.JOURNAL_ENTRY_ADD_MESSAGE_TYPE;
+    }
+
+    @Override
+    public void readExternal(Journal journal, DataInput in) throws IOException {
+        super.readExternal(journal, in);
+        id = in.readLong();
+        message = (Message) journal.getWireFormat().unmarshal(in);
+    }
+
+    @Override
+    public void writeExternal(Journal journal, DataOutput out) throws IOException {
+        super.writeExternal(journal, out);
+        out.writeLong(id);
+        journal.getWireFormat().marshal(message, out);
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/Journal.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/Journal.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/Journal.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.journal.data;
+
+import java.io.IOException;
+
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * 
+ * @author chirino
+ */
+public class Journal {
+
+    public static final byte JOURNAL_ENTRY_TRACE_TYPE = 0;
+    public static final byte JOURNAL_ENTRY_ADD_MESSAGE_TYPE = 1;
+    public static final byte JOURNAL_ENTRY_REMOVE_MESSAGE_TYPE = 2;
+    public static final byte JOURNAL_ENTRY_REMOVE_REFERENCE_TYPE = 3;
+    public static final byte JOURNAL_ENTRY_TRANSACTION_TYPE = 4;
+
+    private final AsyncDataManager asyncDataManager;
+    private final WireFormat wireFormat;
+
+    public Journal(AsyncDataManager asyncDataManager, WireFormat wireFormat) {
+        this.asyncDataManager = asyncDataManager;
+        this.wireFormat = wireFormat;
+    }
+
+    ByteSequence encode(JournalEntry record) throws IOException {
+        DataByteArrayOutputStream out = new DataByteArrayOutputStream();
+        out.writeByte(record.getType());
+        record.writeExternal(this, out);
+        out.close();
+        return out.toByteSequence();
+    }
+
+    JournalEntry decode(ByteSequence sequence) throws IOException {
+        DataByteArrayInputStream in = new DataByteArrayInputStream(sequence);
+        JournalEntry rc;
+        byte type = in.readByte();
+        switch (type) {
+        case JOURNAL_ENTRY_TRACE_TYPE:
+            rc = new TraceJournalEntry();
+            break;
+        case JOURNAL_ENTRY_ADD_MESSAGE_TYPE:
+            rc = new AddMessageJournalEntry();
+            break;
+        case JOURNAL_ENTRY_REMOVE_MESSAGE_TYPE:
+            rc = new RemoveMessageJournalEntry();
+            break;
+        case JOURNAL_ENTRY_REMOVE_REFERENCE_TYPE:
+            rc = new RemoveReferenceJournalEntry();
+            break;
+        case JOURNAL_ENTRY_TRANSACTION_TYPE:
+            rc = new TxJournalEntry();
+            break;
+        default:
+            throw new IOException("Unknown journal record type: " + type);
+        }
+        rc.readExternal(this, in);
+        return rc;
+    }
+
+    public Message loadData(Location location) throws IllegalStateException, IOException {
+        ByteSequence read = asyncDataManager.read(location);
+        AddMessageJournalEntry rc = (AddMessageJournalEntry) decode(read);
+        return rc.getMessage();
+    }
+
+    public AddMessageJournalEntry addData(String store, long id, Message message, TransactionId tx, Runnable onCompleted) throws IllegalStateException, IOException {
+        AddMessageJournalEntry record = new AddMessageJournalEntry(store, tx, id, message);
+        record.setLocation(asyncDataManager.write(encode(record), onCompleted));
+        return record;
+    }
+
+    public TraceJournalEntry trace(String message) throws IllegalStateException, IOException {
+        TraceJournalEntry record = new TraceJournalEntry(message);
+        record.setLocation(asyncDataManager.write(encode(record), false));
+        return record;
+    }
+
+    public RemoveMessageJournalEntry removeData(String store, long id, TransactionId tx, Runnable onCompleted) throws IllegalStateException, IOException {
+        RemoveMessageJournalEntry record = new RemoveMessageJournalEntry(store, tx, id);
+        record.setLocation(asyncDataManager.write(encode(record), onCompleted));
+        return record;
+    }
+
+    public RemoveReferenceJournalEntry removeReference(String store, String refStore, long id, TransactionId tx, Runnable onCompleted) throws IllegalStateException, IOException {
+        RemoveReferenceJournalEntry record = new RemoveReferenceJournalEntry(store, tx, id, refStore);
+        record.setLocation(asyncDataManager.write(encode(record), onCompleted));
+        return record;
+    }
+
+    public TraceJournalEntry synch() throws IllegalStateException, IOException {
+        TraceJournalEntry record = new TraceJournalEntry("SYNC");
+        record.setLocation(asyncDataManager.write(encode(record), true));
+        return record;
+    }
+
+    public TxJournalEntry record(TransactionInfo txOperation, Runnable onCompleted) throws IllegalStateException, IOException {
+        TxJournalEntry record = new TxJournalEntry(txOperation);
+        record.setLocation(asyncDataManager.write(encode(record), onCompleted));
+        return record;
+    }
+
+    /**
+     * Read entries from a journal.. used for journal replay.
+     */
+    public JournalEntry getNextEntry(Location pos) throws IllegalStateException, IOException {
+        pos = asyncDataManager.getNextLocation(pos);
+        if (pos == null) {
+            return null;
+        }
+
+        ByteSequence data = asyncDataManager.read(pos);
+        JournalEntry rc = decode(data);
+        rc.setLocation(pos);
+        return rc;
+    }
+
+    public WireFormat getWireFormat() {
+        return wireFormat;
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/JournalEntry.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/JournalEntry.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/JournalEntry.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/JournalEntry.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.journal.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.kaha.impl.async.Location;
+
+/**
+ * 
+ * @author chirino
+ */
+abstract public class JournalEntry {
+
+    private Location location;
+
+    public Location getLocation() {
+        return location;
+    }
+
+    public void setLocation(Location location) {
+        this.location = location;
+    }
+
+    abstract public byte getType();
+
+    abstract public void writeExternal(Journal journal, DataOutput out) throws IOException;
+
+    abstract public void readExternal(Journal journal, DataInput in) throws IOException;
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/RemoveMessageJournalEntry.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/RemoveMessageJournalEntry.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/RemoveMessageJournalEntry.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/RemoveMessageJournalEntry.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.journal.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.command.TransactionId;
+
+/**
+ * 
+ * @author chirino
+ */
+public class RemoveMessageJournalEntry extends StoreTxJournalEntry {
+
+    private long id;
+
+    public RemoveMessageJournalEntry() {
+    }
+
+    public RemoveMessageJournalEntry(String dataStore, TransactionId tx, long id) {
+        super(dataStore, tx);
+        this.id = id;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    @Override
+    public byte getType() {
+        return Journal.JOURNAL_ENTRY_REMOVE_MESSAGE_TYPE;
+    }
+
+    @Override
+    public void writeExternal(Journal journal, DataOutput out) throws IOException {
+        super.writeExternal(journal, out);
+        out.writeLong(id);
+    }
+
+    @Override
+    public void readExternal(Journal journal, DataInput in) throws IOException {
+        super.readExternal(journal, in);
+        id = in.readLong();
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/RemoveReferenceJournalEntry.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/RemoveReferenceJournalEntry.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/RemoveReferenceJournalEntry.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/RemoveReferenceJournalEntry.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.journal.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.command.TransactionId;
+
+/**
+ * 
+ * @author chirino
+ */
+public class RemoveReferenceJournalEntry extends RemoveMessageJournalEntry {
+
+    private String referenceStore;
+
+    public RemoveReferenceJournalEntry() {
+    }
+
+    public RemoveReferenceJournalEntry(String dataStore, TransactionId tx, long id, String referenceStore) {
+        super(dataStore, tx, id);
+        this.referenceStore = referenceStore;
+    }
+
+    public String getReferenceStore() {
+        return referenceStore;
+    }
+
+    public void setReferenceStore(String referenceStore) {
+        this.referenceStore = referenceStore;
+    }
+
+    @Override
+    public byte getType() {
+        return Journal.JOURNAL_ENTRY_REMOVE_REFERENCE_TYPE;
+    }
+
+    @Override
+    public void writeExternal(Journal journal, DataOutput out) throws IOException {
+        super.writeExternal(journal, out);
+        out.writeUTF(referenceStore);
+    }
+
+    @Override
+    public void readExternal(Journal journal, DataInput in) throws IOException {
+        super.readExternal(journal, in);
+        referenceStore = in.readUTF();
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/StoreJournalEntry.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/StoreJournalEntry.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/StoreJournalEntry.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/StoreJournalEntry.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.journal.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * 
+ * @author chirino
+ */
+abstract public class StoreJournalEntry extends JournalEntry {
+
+    private String datatStore;
+
+    public StoreJournalEntry() {
+    }
+
+    public StoreJournalEntry(String datatStore) {
+        this.datatStore = datatStore;
+    }
+
+    public void setDatatStore(String datatStore) {
+        this.datatStore = datatStore;
+    }
+
+    public String getDatatStore() {
+        return datatStore;
+    }
+
+    @Override
+    public void readExternal(Journal journal, DataInput in) throws IOException {
+        datatStore = in.readUTF();
+    }
+
+    @Override
+    public void writeExternal(Journal journal, DataOutput out) throws IOException {
+        out.writeUTF(datatStore);
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/StoreTxJournalEntry.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/StoreTxJournalEntry.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/StoreTxJournalEntry.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/StoreTxJournalEntry.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.journal.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.command.TransactionId;
+
+/**
+ * 
+ * @author chirino
+ */
+public abstract class StoreTxJournalEntry extends StoreJournalEntry {
+
+    private TransactionId transactionId;
+
+    public StoreTxJournalEntry() {
+    }
+
+    public StoreTxJournalEntry(String dataStore, TransactionId tx) {
+        super(dataStore);
+        this.setTransactionId(tx);
+    }
+
+    public void setTransactionId(TransactionId transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    public TransactionId getTransactionId() {
+        return transactionId;
+    }
+
+    @Override
+    public void writeExternal(Journal journal, DataOutput out) throws IOException {
+        super.writeExternal(journal, out);
+        journal.getWireFormat().marshal(transactionId, out);
+    }
+
+    @Override
+    public void readExternal(Journal journal, DataInput in) throws IOException {
+        super.readExternal(journal, in);
+        transactionId = (TransactionId) journal.getWireFormat().unmarshal(in);
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/TraceJournalEntry.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/TraceJournalEntry.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/TraceJournalEntry.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/TraceJournalEntry.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.journal.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * 
+ * @author chirino
+ */
+public class TraceJournalEntry extends JournalEntry {
+    private String message;
+
+    public TraceJournalEntry() {
+    }
+
+    public TraceJournalEntry(String message) {
+        this.message = message;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    @Override
+    public byte getType() {
+        return Journal.JOURNAL_ENTRY_TRACE_TYPE;
+    }
+
+    @Override
+    public void readExternal(Journal journal, DataInput in) throws IOException {
+        message = in.readUTF();
+    }
+
+    @Override
+    public void writeExternal(Journal journal, DataOutput out) throws IOException {
+        out.writeUTF(message);
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/TxJournalEntry.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/TxJournalEntry.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/TxJournalEntry.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/TxJournalEntry.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.journal.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.command.TransactionInfo;
+
+/**
+ * 
+ * @author chirino
+ */
+public class TxJournalEntry extends JournalEntry {
+
+    private TransactionInfo transaction;
+
+    public TxJournalEntry() {
+    }
+
+    public TxJournalEntry(TransactionInfo transaction) {
+        this.transaction = transaction;
+    }
+
+    public TransactionInfo getTransaction() {
+        return transaction;
+    }
+
+    public void setTransaction(TransactionInfo transaction) {
+        this.transaction = transaction;
+    }
+
+    @Override
+    public byte getType() {
+        return Journal.JOURNAL_ENTRY_TRANSACTION_TYPE;
+    }
+
+    @Override
+    public void readExternal(Journal journal, DataInput in) throws IOException {
+        transaction = (TransactionInfo) journal.getWireFormat().unmarshal(in);
+    }
+
+    @Override
+    public void writeExternal(Journal journal, DataOutput out) throws IOException {
+        journal.getWireFormat().marshal(transaction, out);
+    }
+
+}
\ No newline at end of file



Mime
View raw message