activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r760028 [2/3] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/protocol/ main/java/org/apache/activemq/broker/stomp/ main/jav...
Date Mon, 30 Mar 2009 16:20:31 GMT
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Mon Mar 30 16:20:28 2009
@@ -21,43 +21,59 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.broker.BrokerMessageDelivery;
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.protocol.ProtocolHandler;
 import org.apache.activemq.broker.protocol.ProtocolHandlerFactory;
 import org.apache.activemq.broker.store.Store.Callback;
+import org.apache.activemq.broker.store.Store.FatalStoreException;
 import org.apache.activemq.broker.store.Store.KeyNotFoundException;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.broker.store.memory.MemoryStore;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowResource;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
-import org.apache.activemq.queue.ExclusiveQueue;
-import org.apache.activemq.queue.IPollableFlowSource;
-import org.apache.activemq.queue.PersistentQueue;
-import org.apache.activemq.queue.IPollableFlowSource.FlowReadyListener;
+import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.protobuf.AsciiBuffer;
 
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+public class BrokerDatabase extends AbstractLimitedFlowResource<BrokerDatabase.Operation> {
 
-public class BrokerDatabase {
-
-    private Store store = new MemoryStore();
+    private final Store store;
     private final Flow databaseFlow = new Flow("database", false);
 
-    private final SizeLimiter<Operation> storeLimiter;
+    private final SizeLimiter<OperationBase> storeLimiter;
+    private final FlowController<OperationBase> storeController;
+
+    private final IDispatcher dispatcher;
     private Thread flushThread;
-    private final ExclusiveQueue<Operation> opQueue;
+    private final ConcurrentLinkedQueue<OperationBase> opQueue;
     private AtomicBoolean running = new AtomicBoolean(false);
-    private final Semaphore opsReady = new Semaphore(0);
-    private final FlowReadyListener<Operation> enqueueListener;
     private DatabaseListener listener;
 
     private HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
+    private AtomicBoolean notify = new AtomicBoolean(false);
+    private Semaphore opsReady = new Semaphore(0);
+    private long opSequenceNumber;
+    private long flushPointer = 0; // The last seq num for which flush was
+    // requested
+    private long requestedDelayedFlushPointer = -1; // Set to the last sequence
+    // num scheduled for delay
+    private long delayedFlushPointer = 0; // The last delayable sequence num
+    // requested.
+    private final long FLUSH_DELAY_MS = 5;
+    private final Runnable flushDelayCallback;
 
     public interface DatabaseListener {
         /**
@@ -77,33 +93,50 @@
      * @author cmacnaug
      */
     public interface RestoredMessage {
-        MessageDelivery getMessageDelivery();
+        MessageDelivery getMessageDelivery() throws IOException;
     }
 
     public interface MessageRestoreListener {
         public void messagesRestored(Collection<RestoredMessage> msgs);
     }
 
-    public BrokerDatabase() {
-        storeLimiter = new SizeLimiter<Operation>(1024 * 512, 0) {
-            public int getElementSize(Operation op) {
+    public BrokerDatabase(Store store, IDispatcher dispatcher) {
+        this.store = store;
+        this.dispatcher = dispatcher;
+        this.opQueue = new ConcurrentLinkedQueue<OperationBase>();
+        storeLimiter = new SizeLimiter<OperationBase>(5000, 0) {
+
+            @Override
+            public int getElementSize(OperationBase op) {
                 return op.getLimiterSize();
             }
         };
-        opQueue = new ExclusiveQueue<Operation>(databaseFlow, "DataBaseQueue", storeLimiter);
-        enqueueListener = new FlowReadyListener<Operation>() {
 
-            public void onFlowReady(IPollableFlowSource<Operation> source) {
-                opsReady.release();
+        storeController = new FlowController<OperationBase>(new FlowControllable<OperationBase>() {
+
+            public void flowElemAccepted(ISourceController<OperationBase> controller, OperationBase op) {
+                addToOpQueue(op);
+            }
+
+            public IFlowResource getFlowResource() {
+                return BrokerDatabase.this;
+            }
+
+        }, databaseFlow, storeLimiter, opQueue);
+        storeController.useOverFlowQueue(false);
+
+        flushDelayCallback = new Runnable() {
+            public void run() {
+                flushDelayCallback();
             }
         };
     }
 
-    public synchronized void start() {
+    public synchronized void start() throws Exception {
         if (flushThread == null) {
 
             running.set(true);
-            opsReady.drainPermits();
+            store.start();
             flushThread = new Thread(new Runnable() {
 
                 public void run() {
@@ -115,7 +148,7 @@
         }
     }
 
-    public synchronized void stop() {
+    public synchronized void stop() throws Exception {
         if (flushThread != null) {
 
             running.set(false);
@@ -130,6 +163,9 @@
                 }
             }
 
+            store.flush();
+            store.stop();
+
             if (interrupted) {
                 Thread.currentThread().interrupt();
             }
@@ -138,49 +174,6 @@
     }
 
     /**
-     * Saves a message for all of the recipients in the
-     * {@link BrokerMessageDelivery}.
-     * 
-     * @param delivery
-     *            The delivery.
-     * @param source
-     *            The source's controller.
-     */
-    public void persistReceivedMessage(BrokerMessageDelivery delivery, ISourceController<?> source) {
-        add(new AddMessageOperation(delivery), source, true);
-    }
-
-    /**
-     * Saves a Message for a single queue.
-     * 
-     * @param delivery
-     *            The delivery
-     * @param queue
-     *            The queue
-     * @param source
-     *            The source initiating the save or null, if there isn't one.
-     */
-    public void saveMessage(MessageDelivery delivery, PersistentQueue<MessageDelivery> queue, ISourceController<?> source) {
-        add(new AddMessageOperation(delivery, queue), source, false);
-    }
-
-    /**
-     * Deletes the given message from the store for the given queue.
-     * 
-     * @param delivery
-     *            The delivery.
-     * @param queue
-     *            The queue.
-     */
-    public void deleteMessage(MessageDelivery delivery, PersistentQueue<MessageDelivery> queue) {
-        opQueue.add(new DeleteMessageOperation(delivery, queue), null);
-    }
-
-    public void restoreMessages(PersistentQueue<MessageDelivery> queue, long first, int max, MessageRestoreListener listener) {
-        opQueue.add(new RestoreMessageOperation(queue, first, max, listener), null);
-    }
-
-    /**
      * Executes user supplied {@link Operation}. If the {@link Operation} does
      * not throw any Exceptions, all updates to the store are committed,
      * otherwise they are rolled back. Any exceptions thrown by the
@@ -202,60 +195,129 @@
      *            Whether or not this operation needs immediate processing.
      * @param controller
      *            the source of the operation.
+     * @return the {@link OperationContext} associated with the operation
      */
-    private void add(Operation op, ISourceController<?> controller, boolean flush) {
-        opQueue.add(op, controller);
+    private OperationContext add(OperationBase op, ISourceController<?> controller, boolean flush) {
+
+        op.flushRequested = flush;
+        storeController.add(op, controller);
+        return op;
+    }
+
+    private synchronized final void addToOpQueue(OperationBase op) {
+        if (!running.get()) {
+            throw new IllegalStateException("BrokerDatabase not started");
+        }
+        op.opSequenceNumber = opSequenceNumber++;
+        opQueue.add(op);
+        if (op.flushRequested) {
+            if (op.isDelayable() && FLUSH_DELAY_MS > 0) {
+                scheduleDelayedFlush(op.opSequenceNumber);
+            } else {
+                updateFlushPointer(op.opSequenceNumber);
+            }
+        }
+        if (notify.get()) {
+            opsReady.release();
+        }
+    }
+
+    private void updateFlushPointer(long seqNumber) {
+        if (seqNumber > flushPointer) {
+            if (notify.get()) {
+                opsReady.release();
+            }
+        }
+    }
+
+    private void scheduleDelayedFlush(long seqNumber) {
+        if (seqNumber < flushPointer) {
+            return;
+        }
+
+        if (seqNumber > delayedFlushPointer) {
+            delayedFlushPointer = seqNumber;
+        }
+
+        if (requestedDelayedFlushPointer == -1) {
+            requestedDelayedFlushPointer = delayedFlushPointer;
+            dispatcher.schedule(flushDelayCallback, FLUSH_DELAY_MS, TimeUnit.MILLISECONDS);
+        }
+
+    }
+
+    private synchronized final void flushDelayCallback() {
+        if (flushPointer < requestedDelayedFlushPointer) {
+            updateFlushPointer(requestedDelayedFlushPointer);
+            requestedDelayedFlushPointer = -1;
+            // Schedule next delay if needed:
+            scheduleDelayedFlush(delayedFlushPointer);
+        }
+    }
+
+    private final OperationBase getNextOp(boolean wait) {
+        if (!wait) {
+            return opQueue.poll();
+        } else {
+            OperationBase op = opQueue.poll();
+            if (op == null) {
+                notify.set(true);
+                op = opQueue.poll();
+                try {
+                    while (running.get() && op == null) {
+                        opsReady.acquireUninterruptibly();
+                        op = opQueue.poll();
+                    }
+                } finally {
+                    notify.set(false);
+                    opsReady.drainPermits();
+                }
+            }
+            return op;
+        }
+    }
+
+    private class OpCounter {
+        int count = 0;
     }
 
     private final void processOps() {
+        final OpCounter counter = new OpCounter();
+
         while (running.get()) {
-            final Operation firstOp;
-            synchronized (opQueue) {
-                firstOp = opQueue.poll();
-                if (firstOp == null) {
-                    opQueue.addFlowReadyListener(enqueueListener);
-                    opsReady.acquireUninterruptibly();
-                    continue;
-                }
+            final OperationBase firstOp = getNextOp(true);
+            if (firstOp == null) {
+                continue;
             }
+            counter.count = 1;
 
             // The first operation we get, triggers a store transaction.
             if (firstOp != null) {
-                final ArrayList<Operation> processedQueue = new ArrayList<Operation>();
+                final LinkedList<Operation> processedQueue = new LinkedList<Operation>();
                 try {
-                    store.execute(new Store.VoidCallback<Exception>() {
-                        @Override
-                        public void run(Session session) throws Exception {
-
-                            // Try to execute the operation against the
-                            // session...
-                            try {
-                                firstOp.execute(session);
-                                processedQueue.add(firstOp);
-                            } catch (CancellationException ignore) {
-                            }
-
-                            // See if we can batch up some additional operations
-                            // in
-                            // this transaction.
-
-                            Operation op;
-                            synchronized (opQueue) {
-                                op = opQueue.poll();
-                                if (op != null) {
-                                    try {
-                                        firstOp.execute(session);
-                                        processedQueue.add(op);
-                                    } catch (CancellationException ignore) {
-                                    }
-                                }
-                            }
+
+                    // TODO the recursion here leads to a rather large stack,
+                    // refactor.
+
+                    executeOps(firstOp, processedQueue, counter);
+
+                    // If we procecessed some ops, flush and post process:
+                    if (!processedQueue.isEmpty()) {
+                        // Sync the store:
+                        store.flush();
+
+                        // Post process operations
+                        long release = 0;
+                        for (Operation processed : processedQueue) {
+                            processed.onCommit();
+                            release += processed.getLimiterSize();
+                        }
+
+                        synchronized (opQueue) {
+                            this.storeLimiter.remove(release);
                         }
-                    }, null);
-                    // Wait for the operations to commit.
-                    for (Operation processed : processedQueue) {
-                        processed.onCommit();
                     }
+
                 } catch (IOException e) {
                     for (Operation processed : processedQueue) {
                         processed.onRollback(e);
@@ -275,19 +337,159 @@
         }
     }
 
+    private final void executeOps(final OperationBase op, final LinkedList<Operation> processedQueue, final OpCounter counter) throws FatalStoreException, Exception {
+        store.execute(new Store.VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+
+                // Try to execute the operation against the
+                // session...
+                try {
+                    if (op.execute(session)) {
+                        processedQueue.add(op);
+                    } else {
+                        counter.count--;
+                    }
+                } catch (CancellationException ignore) {
+                    System.out.println("Cancelled" + op);
+                }
+
+                // See if we can batch up some additional operations
+                // in this transaction.
+                if (counter.count < 100) {
+                    OperationBase next = getNextOp(false);
+                    if (next != null) {
+                        counter.count++;
+                        executeOps(next, processedQueue, counter);
+                    }
+                }
+            }
+        }, null);
+    }
+
+    /**
+     * Adds a queue to the database
+     * 
+     * @param queue
+     *            The queue to add.
+     */
+    public void addQueue(AsciiBuffer queue) {
+        add(new QueueAddOperation(queue), null, false);
+    }
+
+    /**
+     * Deletes a queue and all of its messages from the database
+     * 
+     * @param queue
+     *            The queue to delete.
+     */
+    public void deleteQueue(AsciiBuffer queue) {
+        add(new QueueDeleteOperation(queue), null, false);
+    }
+
+    /**
+     * Saves a message for all of the recipients in the
+     * {@link BrokerMessageDelivery}.
+     * 
+     * @param delivery
+     *            The delivery.
+     * @param source
+     *            The source's controller.
+     * @throws IOException
+     *             If there is an error marshalling the message.
+     * @return The {@link OperationContext} associated with the operation
+     */
+    public OperationContext persistReceivedMessage(BrokerMessageDelivery delivery, ISourceController<?> source) throws IOException {
+        return add(new AddMessageOperation(delivery), source, delivery.isFlushDelayable());
+    }
+
+    /**
+     * Saves a Message for a single queue.
+     * 
+     * @param delivery
+     *            The delivery
+     * @param queue
+     *            The queue
+     * @param source
+     *            The source initiating the save or null, if there isn't one.
+     * @throws IOException
+     *             If there is an error marshalling the message.
+     * 
+     * @return The {@link OperationContext} associated with the operation
+     */
+    public OperationContext saveMessage(MessageDelivery delivery, AsciiBuffer queue, ISourceController<?> source) throws IOException {
+        return add(new AddMessageOperation(delivery, queue), source, false);
+    }
+
+    /**
+     * Deletes the given message from the store for the given queue.
+     * 
+     * @param delivery
+     *            The delivery.
+     * @param queue
+     *            The queue.
+     * @return The {@link OperationContext} associated with the operation
+     */
+    public OperationContext deleteMessage(MessageDelivery delivery, AsciiBuffer queue) {
+        return add(new DeleteMessageOperation(delivery, queue), null, false);
+    }
+
+    /**
+     * Loads a batch of messages for the specified queue. The loaded messages
+     * are given the provided {@link MessageRestoreListener}.
+     * <p>
+     * <b><i>NOTE:</i></b> This method uses the queue sequence number for the
+     * message not the store tracking number.
+     * 
+     * @param queue
+     *            The queue for which to load messages
+     * @param first
+     *            The first queue sequence number to load.
+     * @param max
+     *            The maximum number of messages to load.
+     * @param listener
+     *            The listener to which messags should be passed.
+     * @return The {@link OperationContext} associated with the operation
+     */
+    public OperationContext restoreMessages(AsciiBuffer queue, long first, int max, MessageRestoreListener listener) {
+        return add(new RestoreMessageOperation(queue, first, max, listener), null, false);
+    }
+
     private void onDatabaseException(IOException ioe) {
         if (listener != null) {
             listener.onDatabaseException(ioe);
         }
     }
 
+    public interface OperationContext {
+        /**
+         * Attempts to cancel the store operation. Returns true if the operation
+         * could be canceled or false if the operation was already executed by
+         * the store.
+         * 
+         * @return true if the operation could be canceled
+         */
+        public boolean cancel();
+
+        /**
+         * @return true if the operation has been executed
+         */
+        public boolean cancelled();
+
+        /**
+         * @return true if the operation has been executed
+         */
+        public boolean executed();
+
+    }
+
     /**
      * This interface is used to execute transacted code.
      * 
      * It is used by the {@link Store#execute(Callback)} method, often as
      * anonymous class.
      */
-    public interface Operation {
+    public interface Operation extends OperationContext {
 
         /**
          * Gets called by the
@@ -299,10 +501,6 @@
          *            provides you access to read and update the persistent
          *            data.
          * @return the result of the CallableCallback
-         * @throws CancellationException
-         *             if the operation has been canceled. If this is thrown,
-         *             the {@link #onCommit()} and {@link #onRollback()} methods
-         *             will not be called.
          * @throws Exception
          *             if an system error occured while executing the
          *             operations.
@@ -310,7 +508,7 @@
          *             if an system error occured while executing the
          *             operations.
          */
-        public void execute(Session session) throws CancellationException, Exception, RuntimeException;
+        public boolean execute(Session session) throws CancellationException, Exception, RuntimeException;
 
         /**
          * Returns true if this operation can be delayed. This is useful in
@@ -324,15 +522,6 @@
         public boolean isDelayable();
 
         /**
-         * Attempts to cancel the store operation. Returns true if the operation
-         * could be canceled or false if the operation was already executed by
-         * the store.
-         * 
-         * @return true if the operation could be canceled
-         */
-        public boolean cancel();
-
-        /**
          * Returns the size to be used when calculating how much space this
          * operation takes on the store processing queue.
          * 
@@ -357,18 +546,59 @@
      * This is a convenience base class that can be used to implement
      * Operations. It handles operation cancellation for you.
      */
-    public abstract class OperationBase implements Operation {
-        final private AtomicBoolean executePending = new AtomicBoolean(true);
+    private abstract class OperationBase implements Operation {
+        public boolean flushRequested = false;
+        public long opSequenceNumber = -1;
+
+        final protected AtomicBoolean executePending = new AtomicBoolean(true);
+        final protected AtomicBoolean cancelled = new AtomicBoolean(false);
+        final protected AtomicBoolean executed = new AtomicBoolean(false);
 
         public boolean cancel() {
-            return executePending.compareAndSet(true, false);
+            if (executePending.compareAndSet(true, false)) {
+                cancelled.set(true);
+                //System.out.println("Cancelled: " + this);
+                synchronized (opQueue) {
+                    opQueue.remove(this);
+                    storeController.elementDispatched(this);
+                }
+                return true;
+            }
+            return cancelled.get();
+        }
+
+        public final boolean cancelled() {
+            return cancelled.get();
         }
 
-        public void execute(Session session) throws CancellationException {
+        public final boolean executed() {
+            return executed.get();
+        }
+
+        /**
+         * Gets called by the
+         * {@link Store#add(Operation, ISourceController, boolean)} method
+         * within a transactional context. If any exception is thrown including
+         * Runtime exception, the transaction is rolled back.
+         * 
+         * @param session
+         *            provides you access to read and update the persistent
+         *            data.
+         * @return True if processed, false otherwise
+         * @throws Exception
+         *             if an system error occured while executing the
+         *             operations.
+         * @throws RuntimeException
+         *             if an system error occured while executing the
+         *             operations.
+         */
+        public boolean execute(Session session) throws Exception, RuntimeException {
             if (executePending.compareAndSet(true, false)) {
+                executed.set(true);
                 doExcecute(session);
+                return true;
             } else {
-                throw new CancellationException();
+                return false;
             }
         }
 
@@ -390,15 +620,73 @@
          * operation has been rolled back.
          */
         public void onRollback(Throwable error) {
+            error.printStackTrace();
+        }
+    }
+
+    private class QueueAddOperation extends OperationBase {
+
+        private AsciiBuffer queue;
+
+        QueueAddOperation(AsciiBuffer queue) {
+            this.queue = queue;
+        }
+
+        @Override
+        public int getLimiterSize() {
+            // Might consider bumping this up to avoid too much accumulation?
+            return 0;
+        }
+
+        @Override
+        protected void doExcecute(Session session) {
+            session.queueAdd(queue);
+        }
+
+        @Override
+        public void onCommit() {
+
+        }
 
+        public String toString() {
+            return "QueueAdd: " + queue.toString();
+        }
+    }
+
+    private class QueueDeleteOperation extends OperationBase {
+
+        private AsciiBuffer queue;
+
+        QueueDeleteOperation(AsciiBuffer queue) {
+            this.queue = queue;
+        }
+
+        @Override
+        public int getLimiterSize() {
+            // Might consider bumping this up to avoid too much accumulation?
+            return 0;
+        }
+
+        @Override
+        protected void doExcecute(Session session) {
+            session.queueRemove(queue);
+        }
+
+        @Override
+        public void onCommit() {
+
+        }
+        
+        public String toString() {
+            return "QueueDelete: " + queue.toString();
         }
     }
 
     private class DeleteMessageOperation extends OperationBase {
         private final MessageDelivery delivery;
-        private PersistentQueue<MessageDelivery> queue;
+        private AsciiBuffer queue;
 
-        public DeleteMessageOperation(MessageDelivery delivery, PersistentQueue<MessageDelivery> queue) {
+        public DeleteMessageOperation(MessageDelivery delivery, AsciiBuffer queue) {
             this.delivery = delivery;
             this.queue = queue;
         }
@@ -412,7 +700,7 @@
         @Override
         protected void doExcecute(Session session) {
             try {
-                session.queueRemoveMessage(queue.getPeristentQueueName(), delivery.getStoreTracking());
+                session.queueRemoveMessage(queue, delivery.getStoreTracking());
             } catch (KeyNotFoundException e) {
                 // TODO Probably doesn't always mean an error, it is possible
                 // that
@@ -423,23 +711,23 @@
         }
 
         @Override
-        public void onRollback(Throwable error) {
-        }
-
-        @Override
         public void onCommit() {
-            delivery.onMessagePersisted();
+
+        }
+        
+        public String toString() {
+            return "MessageDelete: " + queue.toString() + delivery.getStoreTracking();
         }
     }
 
     private class RestoreMessageOperation extends OperationBase {
-        private PersistentQueue<MessageDelivery> queue;
+        private AsciiBuffer queue;
         private long firstKey;
         private int maxRecords;
         private MessageRestoreListener listener;
         private Collection<RestoredMessage> msgs = null;
 
-        RestoreMessageOperation(PersistentQueue<MessageDelivery> queue, long firstKey, int maxRecords, MessageRestoreListener listener) {
+        RestoreMessageOperation(AsciiBuffer queue, long firstKey, int maxRecords, MessageRestoreListener listener) {
             this.queue = queue;
             this.firstKey = firstKey;
             this.maxRecords = maxRecords;
@@ -451,7 +739,7 @@
 
             Iterator<QueueRecord> records = null;
             try {
-                records = session.queueListMessagesQueue(queue.getPeristentQueueName(), firstKey, maxRecords);
+                records = session.queueListMessagesQueue(queue, firstKey, maxRecords);
 
             } catch (KeyNotFoundException e) {
                 msgs = new ArrayList<RestoredMessage>(0);
@@ -479,13 +767,13 @@
         }
 
         @Override
-        public void onRollback(Throwable error) {
-        }
-
-        @Override
         public void onCommit() {
             listener.messagesRestored(msgs);
         }
+        
+        public String toString() {
+            return "MessageRestore: " + queue.toString() + " first: " + firstKey + " max: " + maxRecords;
+        }
     }
 
     private class AddMessageOperation extends OperationBase {
@@ -493,53 +781,71 @@
         private final BrokerMessageDelivery brokerDelivery;
 
         private final MessageDelivery delivery;
-        private final PersistentQueue<MessageDelivery> target;
+        private final AsciiBuffer target;
+        private final MessageRecord record;
 
-        public AddMessageOperation(BrokerMessageDelivery delivery) {
+        private final boolean delayable;
+
+        public AddMessageOperation(BrokerMessageDelivery delivery) throws IOException {
             this.brokerDelivery = delivery;
             this.delivery = delivery;
             target = null;
+            this.record = delivery.createMessageRecord();
+            this.delayable = delivery.isFlushDelayable();
         }
 
-        public AddMessageOperation(MessageDelivery delivery, PersistentQueue<MessageDelivery> target) {
+        public AddMessageOperation(MessageDelivery delivery, AsciiBuffer target) throws IOException {
             this.brokerDelivery = null;
             this.delivery = delivery;
             this.target = target;
+            this.record = delivery.createMessageRecord();
+            delayable = false;
         }
 
+        public boolean isDelayable() {
+            return delayable;
+        }
+        
         @Override
         public int getLimiterSize() {
             return delivery.getFlowLimiterSize();
         }
-
+        
         @Override
         protected void doExcecute(Session session) {
 
             if (target == null) {
-                MessageRecord record = delivery.createMessageRecord();
-                Long key = session.messageAdd(record);
-                brokerDelivery.beginStore(key);
+                brokerDelivery.beginStore();
+                Collection<AsciiBuffer> targets = brokerDelivery.getPersistentQueues();
+
+                if (!targets.isEmpty()) {
+                    record.setKey(delivery.getStoreTracking());
+                    session.messageAdd(record);
 
-                for (PersistentQueue<MessageDelivery> target : brokerDelivery.getPersistentQueues()) {
-                    try {
-                        QueueRecord queueRecord = new QueueRecord();
-                        queueRecord.setAttachment(null);
-                        queueRecord.setMessageKey(key);
-                        session.queueAddMessage(target.getPeristentQueueName(), queueRecord);
+                    for (AsciiBuffer target : brokerDelivery.getPersistentQueues()) {
+                        try {
+                            QueueRecord queueRecord = new QueueRecord();
+                            queueRecord.setAttachment(null);
+                            queueRecord.setMessageKey(record.getKey());
+                            session.queueAddMessage(target, queueRecord);
 
-                    } catch (KeyNotFoundException e) {
-                        e.printStackTrace();
+                        } catch (KeyNotFoundException e) {
+                            e.printStackTrace();
+                        }
                     }
+                } else {
+                    // Save with no targets must have been cancelled:
+                    // System.out.println("Skipping save for " +
+                    // delivery.getStoreTracking());
                 }
             } else {
 
-                MessageRecord record = delivery.createMessageRecord();
                 Long key = session.messageAdd(record);
                 try {
                     QueueRecord queueRecord = new QueueRecord();
                     queueRecord.setAttachment(null);
                     queueRecord.setMessageKey(key);
-                    session.queueAddMessage(target.getPeristentQueueName(), queueRecord);
+                    session.queueAddMessage(target, queueRecord);
                 } catch (KeyNotFoundException e) {
                     e.printStackTrace();
                 }
@@ -547,15 +853,13 @@
         }
 
         @Override
-        public void onRollback(Throwable error) {
-            error.printStackTrace();
-        }
-
-        @Override
         public void onCommit() {
             delivery.onMessagePersisted();
         }
 
+        public String toString() {
+            return "AddOperation " + delivery.getStoreTracking() + " seq: " + opSequenceNumber + "P/C/E" + super.executePending.get() + "/" + cancelled() + "/" + executed();
+        }
     }
 
     private class RestoredMessageImpl implements RestoredMessage {
@@ -563,8 +867,13 @@
         MessageRecord mRecord;
         ProtocolHandler handler;
 
-        public MessageDelivery getMessageDelivery() {
+        public MessageDelivery getMessageDelivery() throws IOException {
             return handler.createMessageDelivery(mRecord);
         }
     }
+
+    public long allocateStoreTracking() {
+        // TODO Auto-generated method stub
+        return store.allocateStoreTracking();
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java Mon Mar 30 16:20:28 2009
@@ -1,9 +1,11 @@
 package org.apache.activemq.broker.store;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.activemq.queue.PersistentQueue;
+import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.queue.QueueStoreHelper;
 import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.broker.MessageDelivery;
@@ -14,24 +16,29 @@
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.IFlowRelay;
+import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowUnblockListener;
 
+
 public class MessageDeliveryStoreHelper implements QueueStoreHelper<MessageDelivery>, Dispatchable, BrokerDatabase.MessageRestoreListener {
 
     private final BrokerDatabase database;
-    private final PersistentQueue<MessageDelivery> queue;
+    private final AsciiBuffer queue;
     private final DispatchContext dispatchContext;
     private final ConcurrentLinkedQueue<RestoredMessage> restoredMsgs = new ConcurrentLinkedQueue<RestoredMessage>();
     private final IFlowRelay<MessageDelivery> restoreRelay;
     private final SizeLimiter<MessageDelivery> restoreLimiter;
     private final IFlowController<MessageDelivery> controller;
     private final FlowUnblockListener<MessageDelivery> unblockListener;
+    private final IFlowSink<MessageDelivery> targetSink;
 
     private int RESTORE_BATCH_SIZE = 50;
     
-    private boolean restoreComplete;
+    private AtomicBoolean started = new AtomicBoolean(false);
+    private AtomicBoolean restoreComplete = new AtomicBoolean(false);
+    private AtomicBoolean storeLoaded = new AtomicBoolean(false);
 
     private static enum State {
         STOPPED, RESTORING, RESTORED
@@ -39,10 +46,11 @@
 
     private State state = State.RESTORING;
 
-    MessageDeliveryStoreHelper(BrokerDatabase database, PersistentQueue<MessageDelivery> queue, IDispatcher dispatcher) {
+    MessageDeliveryStoreHelper(BrokerDatabase database, AsciiBuffer queueName, IFlowSink<MessageDelivery> sink, IDispatcher dispatcher) {
         this.database = database;
-        this.queue = queue;
-        Flow flow = new Flow("MessageRestorer-" + queue.getPeristentQueueName(), false);
+        this.queue = queueName;
+        this.targetSink = sink;
+        Flow flow = new Flow("MessageRestorer-" + queue, false);
         restoreLimiter = new SizeLimiter<MessageDelivery>(1000, 500) {
             @Override
             public int getElementSize(MessageDelivery msg) {
@@ -64,17 +72,16 @@
         elem.delete(queue);
     }
 
-    public void save(MessageDelivery elem, boolean flush) {
-        elem.persist(queue);
+    public void save(MessageDelivery elem, boolean flush) throws IOException {
+        elem.persist(queue, !flush);
     }
 
     public boolean hasStoredElements() {
-        // TODO Auto-generated method stub
-        return false;
+        return !restoreComplete.get();
     }
 
     public void startLoadingQueue() {
-        // TODO Auto-generated method stub
+        database.restoreMessages(queue, 0, RESTORE_BATCH_SIZE, this);
     }
 
     public void stopLoadingQueue() {
@@ -84,7 +91,7 @@
     public boolean dispatch() {
 
         RestoredMessage restored = restoredMsgs.poll();
-        if (restored == null || restoreComplete) {
+        if (restored == null || restoreComplete.get()) {
             return true;
         }
 
@@ -93,20 +100,24 @@
                 return true;
             }
         } else {
-            queue.addFromStore(restored.getMessageDelivery(), controller);
+            try {
+                targetSink.add(restored.getMessageDelivery(), controller);
+            } catch (IOException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
         }
 
         return false;
     }
 
     public void messagesRestored(Collection<RestoredMessage> msgs) {
-        synchronized (restoredMsgs) {
-            if (!msgs.isEmpty()) {
-                restoredMsgs.addAll(msgs);
-            } else {
-
-            }
+        if (!msgs.isEmpty()) {
+            restoredMsgs.addAll(msgs);
+        } else {
+            storeLoaded.set(true);
         }
+        dispatchContext.requestDispatch();
 
         dispatchContext.requestDispatch();
     }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java Mon Mar 30 16:20:28 2009
@@ -16,13 +16,13 @@
  */
 package org.apache.activemq.broker.store;
 
+import java.io.File;
 import java.util.Iterator;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 
-
 /**
  * Interface to persistently store and access data needed by the messaging
  * system.
@@ -47,7 +47,7 @@
             super(cause);
         }
     }
-    
+
     public class DuplicateKeyException extends Exception {
         private static final long serialVersionUID = -477567614452245482L;
 
@@ -57,6 +57,7 @@
         public DuplicateKeyException(String message) {
             super(message);
         }
+
         public DuplicateKeyException(String message, Throwable cause) {
             super(message, cause);
         }
@@ -85,7 +86,29 @@
             super(cause);
         }
     }
-    
+
+    /**
+     * Sets the store's root directory;
+     * 
+     * @param directory
+     *            The root directory
+     * 
+     */
+    public void setStoreDirectory(File directory);
+
+    /**
+     * Indicates that all messages should be deleted on startup
+     * 
+     * @param val
+     *            True if all messages should be deleted on startup
+     */
+    public void setDeleteAllMessages(boolean val);
+
+    /**
+     * @return a unique sequential store tracking number.
+     */
+    public long allocateStoreTracking();
+
     /**
      * This interface is used to execute transacted code.
      * 
@@ -95,61 +118,72 @@
     public interface Callback<R, T extends Exception> {
 
         /**
-         * Gets called by the {@link Store#execute(Callback)} method
-         * within a transactional context. If any exception is thrown including
-         * Runtime exception, the transaction is rolled back.
+         * Gets called by the {@link Store#execute(Callback)} method within a
+         * transactional context. If any exception is thrown including Runtime
+         * exception, the transaction is rolled back.
          * 
          * @param session
          *            provides you access to read and update the persistent
          *            data.
          * @return the result of the Callback
          * @throws T
-         *            if an system error occured while executing the
-         *            operations.
+         *             if an system error occured while executing the
+         *             operations.
          */
         public R execute(Session session) throws T;
     }
-    
+
     /**
-     * Convenience class which allows you to implement {@link Callback} classes which do not return a value.
+     * Convenience class which allows you to implement {@link Callback} classes
+     * which do not return a value.
      */
-    public abstract class VoidCallback <T extends Exception> implements Callback<Object, T> {
-        
+    public abstract class VoidCallback<T extends Exception> implements Callback<Object, T> {
+
         /**
-         * Gets called by the {@link Store#execute(VoidCallback)} method within a transactional context.  
-         * If any exception is thrown including Runtime exception, the transaction is rolled back.
+         * Gets called by the {@link Store#execute(VoidCallback)} method within
+         * a transactional context. If any exception is thrown including Runtime
+         * exception, the transaction is rolled back.
          * 
-         * @param session provides you access to read and update the persistent data.
-         * @throws T if an error occurs and the transaction should get rolled back
+         * @param session
+         *            provides you access to read and update the persistent
+         *            data.
+         * @throws T
+         *             if an error occurs and the transaction should get rolled
+         *             back
          */
         abstract public void run(Session session) throws T;
-        
+
         final public Object execute(Session session) throws T {
             run(session);
             return null;
         }
     }
-    
+
     public static class QueueRecord {
         Long queueKey;
         Long messageKey;
         Buffer attachment;
-        
+
         public Long getQueueKey() {
             return queueKey;
         }
+
         public void setQueueKey(Long queueKey) {
             this.queueKey = queueKey;
         }
+
         public Long getMessageKey() {
             return messageKey;
         }
+
         public void setMessageKey(Long messageKey) {
             this.messageKey = messageKey;
         }
+
         public Buffer getAttachment() {
             return attachment;
         }
+
         public void setAttachment(Buffer attachment) {
             this.attachment = attachment;
         }
@@ -157,62 +191,76 @@
 
     // Message related methods.
     public static class MessageRecord {
-        Long key;
+        Long key = (long) -1;
         AsciiBuffer messageId;
         AsciiBuffer encoding;
         Buffer buffer;
         Long streamKey;
-        
+
         public Long getKey() {
             return key;
         }
+
         public void setKey(Long key) {
             this.key = key;
         }
+
         public AsciiBuffer getMessageId() {
             return messageId;
         }
+
         public void setMessageId(AsciiBuffer messageId) {
             this.messageId = messageId;
         }
+
         public AsciiBuffer getEncoding() {
             return encoding;
         }
+
         public void setEncoding(AsciiBuffer encoding) {
             this.encoding = encoding;
         }
+
         public Buffer getBuffer() {
             return buffer;
         }
+
         public void setBuffer(Buffer buffer) {
             this.buffer = buffer;
         }
+
         public Long getStreamKey() {
             return streamKey;
         }
+
         public void setStreamKey(Long stream) {
             this.streamKey = stream;
         }
     }
-    
+
     /**
-     * Executes user supplied {@link Callback}.  If the {@link Callback} does not throw
-     * any Exceptions, all updates to the store are committed to the store as a single 
-     * unit of work, otherwise they are rolled back. 
+     * Executes user supplied {@link Callback}. If the {@link Callback} does not
+     * throw any Exceptions, all updates to the store are committed to the store
+     * as a single unit of work, otherwise they are rolled back.
      * 
-     * When this method returns, the transaction may be buffered by the Store implementation
-     * it increase performance throughput.  The onFlush parameter can be used to know when
-     * the transaction does get flushed is guaranteed to not be lost if a system crash occurs.
+     * When this method returns, the transaction may be buffered by the Store
+     * implementation it increase performance throughput. The onFlush parameter
+     * can be used to know when the transaction does get flushed is guaranteed
+     * to not be lost if a system crash occurs.
      * 
-     * You can force the flushing of all previously buffered transactions using the {@link #flush} method.
+     * You can force the flushing of all previously buffered transactions using
+     * the {@link #flush} method.
      * 
-     * Any exceptions thrown by the  {@link Callback} are propagated by this method.
+     * Any exceptions thrown by the {@link Callback} are propagated by this
+     * method.
      * 
      * @param <T>
      * @param closure
-     * @param onFlush if not null, it's {@link Runnable#run()} method is called once he transaction has been store on disk.
+     * @param onFlush
+     *            if not null, it's {@link Runnable#run()} method is called once
+     *            he transaction has been store on disk.
      */
-    public <R, T extends Exception> R execute(Callback<R,T> callback, Runnable onFlush) throws T, FatalStoreException;
+    public <R, T extends Exception> R execute(Callback<R, T> callback, Runnable onFlush) throws T, FatalStoreException;
 
     /**
      * Flushes all committed buffered transactions.
@@ -227,41 +275,59 @@
      * 
      */
     public interface Session {
-        
+
         public Long messageAdd(MessageRecord message);
-        public Long messageGetKey(AsciiBuffer messageId);
+
         public MessageRecord messageGetRecord(Long key) throws KeyNotFoundException;
 
         public Long streamOpen();
+
         public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException;
+
         public void streamClose(Long streamKey) throws KeyNotFoundException;
+
         public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException;
+
         public boolean streamRemove(Long streamKey);
 
         // Transaction related methods.
         public Iterator<Buffer> transactionList(Buffer first, int max);
+
         public void transactionAdd(Buffer txid);
+
         public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException;
+
         public void transactionRemoveMessage(Buffer txid, AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException;
+
         public void transactionCommit(Buffer txid) throws KeyNotFoundException;
+
         public void transactionRollback(Buffer txid) throws KeyNotFoundException;
-        
+
         // Queue related methods.
         public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max);
+
         public void queueAdd(AsciiBuffer queueName);
+
         public void queueRemove(AsciiBuffer queueName);
-        
 
         public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException;
-        public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException;
+
+        public void queueRemoveMessage(AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException;
+
         public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException;
 
         public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max);
+
         public boolean mapAdd(AsciiBuffer map);
+
         public boolean mapRemove(AsciiBuffer map);
+
         public Buffer mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException;
+
         public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
+
         public Buffer mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
+
         public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException;
 
     }

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/StoreFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/StoreFactory.java?rev=760028&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/StoreFactory.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/StoreFactory.java Mon Mar 30 16:20:28 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import java.io.IOException;
+
+import org.apache.activemq.util.FactoryFinder;
+
+public class StoreFactory {
+
+    static private final FactoryFinder STORE_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/broker/store/");
+
+    public static Store createStore(String type) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException {
+        return (Store) STORE_FINDER.newInstance(type);
+    }
+}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java Mon Mar 30 16:20:28 2009
@@ -42,28 +42,35 @@
         public DestinationEntity readPayload(DataInput dataIn) throws IOException {
             DestinationEntity value = new DestinationEntity();
             value.queueIndex = new BTreeIndex<Long, QueueRecord>(dataIn.readLong());
+            value.trackingIndex = new BTreeIndex<Long, Long>(dataIn.readLong());
             return value;
         }
 
         public void writePayload(DestinationEntity value, DataOutput dataOut) throws IOException {
             dataOut.writeLong(value.queueIndex.getPageId());
+            dataOut.writeLong(value.trackingIndex.getPageId());
         }
     };
 
     private long nextQueueKey;
     private BTreeIndex<Long, QueueRecord> queueIndex;
-
+    private BTreeIndex<Long, Long> trackingIndex;
+    
     ///////////////////////////////////////////////////////////////////
     // Lifecycle Methods.
     ///////////////////////////////////////////////////////////////////
     public void allocate(Transaction tx) throws IOException {
         queueIndex = new BTreeIndex<Long, QueueRecord>(tx.allocate());
+        trackingIndex = new BTreeIndex<Long, Long>(tx.allocate());
     }
     
     public void deallocate(Transaction tx) throws IOException {
         queueIndex.clear(tx);
+        trackingIndex.clear(tx);
+        tx.free(trackingIndex.getPageId());
         tx.free(queueIndex.getPageId());
         queueIndex=null;
+        trackingIndex=null;
     }
     
     public void load(Transaction tx) throws IOException {
@@ -80,6 +87,14 @@
                 nextQueueKey = lastEntry.getKey()+1;
             }
         }
+        
+        if( trackingIndex.getPageFile()==null ) {
+            
+            trackingIndex.setPageFile(tx.getPageFile());
+            trackingIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+            trackingIndex.setValueMarshaller(LongMarshaller.INSTANCE);
+            trackingIndex.load(tx);
+        }
     }
     
     ///////////////////////////////////////////////////////////////////
@@ -95,10 +110,17 @@
         value.setMessageKey(command.getMessageKey());
         value.setQueueKey(command.getQueueKey());
         queueIndex.put(tx, value.getQueueKey(), value);
+        trackingIndex.put(tx, command.getMessageKey(), command.getQueueKey());
     }
 
-    public void remove(Transaction tx, long queueKey) throws IOException {
-        queueIndex.remove(tx, queueKey);
+    public boolean remove(Transaction tx, long msgKey) throws IOException {
+        Long queueKey = trackingIndex.remove(tx, msgKey);
+        if(queueKey != null)
+        {
+            queueIndex.remove(tx, queueKey);
+            return true;
+        }
+        return false;
     }
 
     public Iterator<QueueRecord> listMessages(Transaction tx, Long firstQueueKey, final int max) throws IOException {
@@ -121,5 +143,9 @@
         return rc.iterator();
     }
 
+    public Iterator<Entry<Long, Long>> listTrackingNums(Transaction tx) throws IOException {
+        return trackingIndex.iterator(tx);
+    }
+
 
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Mon Mar 30 16:20:28 2009
@@ -27,6 +27,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.broker.store.Store;
@@ -97,6 +98,7 @@
     private LockFile lockFile;
     private Location nextRecoveryPosition;
     private Location lastRecoveryPosition;
+    private AtomicLong trackingGen = new AtomicLong(0);
 
     protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
     private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
@@ -122,6 +124,13 @@
         }
     }
 
+    /**
+     * @return a unique sequential store tracking number.
+     */
+    public long allocateStoreTracking() {
+        return trackingGen.incrementAndGet();
+    }
+
     private void loadPageFile() throws IOException {
         indexLock.writeLock().lock();
         try {
@@ -703,7 +712,6 @@
      * @return
      * @throws IOException
      */
-    @SuppressWarnings("unchecked")
     public TypeCreatable load(Location location) throws IOException {
         ByteSequence data = journal.read(location);
         return load(location, data);
@@ -772,13 +780,16 @@
         DestinationEntity destination = rootEntity.getDestination(command.getQueueName());
         if (destination != null) {
             destination.add(tx, command);
+            rootEntity.addMessageRef(tx, command.getQueueName(), command.getMessageKey());
         }
     }
 
     private void queueRemoveMessage(Transaction tx, QueueRemoveMessage command, Location location) throws IOException {
         DestinationEntity destination = rootEntity.getDestination(command.getQueueName());
         if (destination != null) {
-            destination.remove(tx, command.getQueueKey());
+            if (destination.remove(tx, command.getMessageKey())) {
+                rootEntity.removeMessageRef(tx, command.getQueueName(), command.getMessageKey());
+            }
         }
     }
 
@@ -852,10 +863,6 @@
             return id;
         }
 
-        public Long messageGetKey(AsciiBuffer messageId) {
-            return rootEntity.messageGetKey(tx(), messageId);
-        }
-
         public MessageRecord messageGetRecord(Long key) throws KeyNotFoundException {
             Location location = rootEntity.messageGetLocation(tx(), key);
             if (location == null) {
@@ -911,9 +918,9 @@
             return queueKey;
         }
 
-        public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException {
+        public void queueRemoveMessage(AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException {
             QueueRemoveMessageBean bean = new QueueRemoveMessageBean();
-            bean.setQueueKey(queueKey);
+            bean.setMessageKey(messageKey);
             bean.setQueueName(queueName);
             updates.add(bean);
         }
@@ -1026,11 +1033,11 @@
                     }
                 });
             }
-            
-            // Keep trying waiting for the flush to happen unless the store 
+
+            // Keep trying waiting for the flush to happen unless the store
             // has been stopped.
-            while(started.get()) {
-                if( done.await(100, TimeUnit.MILLISECONDS) ) {
+            while (started.get()) {
+                if (done.await(100, TimeUnit.MILLISECONDS)) {
                     return;
                 }
             }
@@ -1061,7 +1068,7 @@
         return directory;
     }
 
-    public void setDirectory(File directory) {
+    public void setStoreDirectory(File directory) {
         this.directory = directory;
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Mon Mar 30 16:20:28 2009
@@ -36,7 +36,7 @@
 import org.apache.kahadb.util.Marshaller;
 
 public class RootEntity {
-    
+
     public final static Marshaller<RootEntity> MARSHALLER = new Marshaller<RootEntity>() {
         public Class<RootEntity> getType() {
             return RootEntity.class;
@@ -45,10 +45,10 @@
         public RootEntity readPayload(DataInput is) throws IOException {
             RootEntity rc = new RootEntity();
             rc.state = is.readInt();
-            rc.messageKeyIndex = new BTreeIndex<Long, MessageKeys>(is.readLong());
-            rc.locationIndex = new BTreeIndex<Location, Long>(is.readLong());
-            rc.messageIdIndex = new BTreeIndex<AsciiBuffer, Long>(is.readLong());
+            rc.messageKeyIndex = new BTreeIndex<Long, Location>(is.readLong());
+            //rc.locationIndex = new BTreeIndex<Location, Long>(is.readLong());
             rc.destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(is.readLong());
+            rc.messageRefsIndex = new BTreeIndex<Long, Long>(is.readLong());
             if (is.readBoolean()) {
                 rc.lastUpdate = Marshallers.LOCATION_MARSHALLER.readPayload(is);
             } else {
@@ -60,9 +60,9 @@
         public void writePayload(RootEntity object, DataOutput os) throws IOException {
             os.writeInt(object.state);
             os.writeLong(object.messageKeyIndex.getPageId());
-            os.writeLong(object.locationIndex.getPageId());
-            os.writeLong(object.messageIdIndex.getPageId());
+            //os.writeLong(object.locationIndex.getPageId());
             os.writeLong(object.destinationIndex.getPageId());
+            os.writeLong(object.messageRefsIndex.getPageId());
             if (object.lastUpdate != null) {
                 os.writeBoolean(true);
                 Marshallers.LOCATION_MARSHALLER.writePayload(object.lastUpdate, os);
@@ -82,116 +82,140 @@
 
     // Message Indexes
     private long nextMessageKey;
-    private BTreeIndex<Long, MessageKeys> messageKeyIndex;
-    private BTreeIndex<Location, Long> locationIndex;
-    private BTreeIndex<AsciiBuffer, Long> messageIdIndex;
+    private BTreeIndex<Long, Location> messageKeyIndex;
+    //private BTreeIndex<Location, Long> locationIndex;
+    private BTreeIndex<Long, Long> messageRefsIndex; // Maps message key to ref count:
 
     // The destinations
     private BTreeIndex<AsciiBuffer, DestinationEntity> destinationIndex;
     private final TreeMap<AsciiBuffer, DestinationEntity> destinations = new TreeMap<AsciiBuffer, DestinationEntity>();
 
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
     // Lifecycle Methods.
-    ///////////////////////////////////////////////////////////////////
-    
+    // /////////////////////////////////////////////////////////////////
+
     public void allocate(Transaction tx) throws IOException {
         // First time this is created.. Initialize a new pagefile.
         Page<RootEntity> page = tx.allocate();
         pageId = page.getPageId();
         assert pageId == 0;
-        
+
         state = KahaDBStore.CLOSED_STATE;
-        
-        messageKeyIndex = new BTreeIndex<Long, MessageKeys>(tx.getPageFile(), tx.allocate().getPageId());
-        locationIndex = new BTreeIndex<Location, Long>(tx.getPageFile(), tx.allocate().getPageId());
-        messageIdIndex = new BTreeIndex<AsciiBuffer, Long>(tx.getPageFile(), tx.allocate().getPageId());
+
+        messageKeyIndex = new BTreeIndex<Long, Location>(tx.getPageFile(), tx.allocate().getPageId());
+        //locationIndex = new BTreeIndex<Location, Long>(tx.getPageFile(), tx.allocate().getPageId());
         destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(tx.getPageFile(), tx.allocate().getPageId());
+        messageRefsIndex = new BTreeIndex<Long, Long>(tx.getPageFile(), tx.allocate().getPageId());
 
         page.set(this);
         tx.store(page, MARSHALLER, true);
     }
-    
+
     public void load(Transaction tx) throws IOException {
         messageKeyIndex.setPageFile(tx.getPageFile());
         messageKeyIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-        messageKeyIndex.setValueMarshaller(MessageKeys.MARSHALLER);
+        messageKeyIndex.setValueMarshaller(Marshallers.LOCATION_MARSHALLER);
         messageKeyIndex.load(tx);
 
-        locationIndex.setPageFile(tx.getPageFile());
-        locationIndex.setKeyMarshaller(Marshallers.LOCATION_MARSHALLER);
-        locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
-        locationIndex.load(tx);
-
-        messageIdIndex.setPageFile(tx.getPageFile());
-        messageIdIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
-        messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
-        messageIdIndex.load(tx);
-        
+        //locationIndex.setPageFile(tx.getPageFile());
+        //locationIndex.setKeyMarshaller(Marshallers.LOCATION_MARSHALLER);
+        //locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
+        //locationIndex.load(tx);
+
         destinationIndex.setPageFile(tx.getPageFile());
         destinationIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
         destinationIndex.setValueMarshaller(DestinationEntity.MARSHALLER);
         destinationIndex.load(tx);
-        
+
+        messageRefsIndex.setPageFile(tx.getPageFile());
+        messageRefsIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+        messageRefsIndex.setValueMarshaller(LongMarshaller.INSTANCE);
+        messageRefsIndex.load(tx);
+
         // Keep the StoredDestinations loaded
         destinations.clear();
         for (Iterator<Entry<AsciiBuffer, DestinationEntity>> iterator = destinationIndex.iterator(tx); iterator.hasNext();) {
             Entry<AsciiBuffer, DestinationEntity> entry = iterator.next();
             entry.getValue().load(tx);
             destinations.put(entry.getKey(), entry.getValue());
-        }        
+        }
     }
-    
+
     public void store(Transaction tx) throws IOException {
         Page<RootEntity> page = tx.load(pageId, null);
         page.set(this);
         tx.store(page, RootEntity.MARSHALLER, true);
     }
 
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
     // Message Methods.
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
     public Long nextMessageKey() {
         return nextMessageKey++;
     }
 
     public void messageAdd(Transaction tx, MessageAdd command, Location location) throws IOException {
         long id = command.getMessageKey();
-        Long previous = locationIndex.put(tx, location, id);
-        if( previous == null ) {
-            messageIdIndex.put(tx, command.getMessageId(), id);
-            messageKeyIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
-        } else {
-            // Message existed.. undo the index update we just did.  Chances
+        Location previous = messageKeyIndex.put(tx, id, location);
+        if (previous != null) {
+            // Message existed.. undo the index update we just did. Chances
             // are it's a transaction replay.
-            locationIndex.put(tx, location, previous);
+            messageKeyIndex.put(tx, id, previous);
         }
     }
+    
+    public void messageRemove(Transaction tx, Long messageKey) throws IOException {
+        //Location location = messageKeyIndex.remove(tx, messageKey);
+        messageKeyIndex.remove(tx, messageKey);
+        //if (location != null) {
+        //    locationIndex.remove(tx, location);
+        //}
+    }
 
-    public Long messageGetKey(Transaction tx, AsciiBuffer messageId) {
+    public Location messageGetLocation(Transaction tx, Long messageKey) {
         try {
-            return messageIdIndex.get(tx, messageId);
+            return messageKeyIndex.get(tx, messageKey);
         } catch (IOException e) {
             throw new Store.FatalStoreException(e);
         }
     }
-    
-    public Location messageGetLocation(Transaction tx, Long messageKey) {
+
+    public void addMessageRef(Transaction tx, AsciiBuffer queueName, Long messageKey) {
         try {
-            MessageKeys t = messageKeyIndex.get(tx, messageKey);
-            if( t==null ) {
-                return null;
+            Long refs = messageRefsIndex.get(tx, messageKey);
+            if (refs == null) {
+                messageRefsIndex.put(tx, messageKey, new Long(1));
+            } else {
+                messageRefsIndex.put(tx, messageKey, new Long(1 + refs.longValue()));
             }
-            return t.location;
         } catch (IOException e) {
             throw new Store.FatalStoreException(e);
         }
+
     }
 
-    ///////////////////////////////////////////////////////////////////
+    public void removeMessageRef(Transaction tx, AsciiBuffer queueName, Long messageKey) {
+        try {
+            Long refs = messageRefsIndex.get(tx, messageKey);
+            if (refs != null) {
+                if (refs.longValue() <= 1) {
+                    messageRefsIndex.remove(tx, messageKey);
+                    //If this is the last record remove, the message
+                    messageRemove(tx, messageKey);
+                } else {
+                    messageRefsIndex.put(tx, messageKey, new Long(refs.longValue() - 1));
+                }
+            }
+        } catch (IOException e) {
+            throw new Store.FatalStoreException(e);
+        }
+    }
+
+    // /////////////////////////////////////////////////////////////////
     // Queue Methods.
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
     public void queueAdd(Transaction tx, AsciiBuffer queueName) throws IOException {
-        if( destinationIndex.get(tx, queueName)==null ) {
+        if (destinationIndex.get(tx, queueName) == null) {
             DestinationEntity rc = new DestinationEntity();
             rc.allocate(tx);
             destinationIndex.put(tx, queueName, rc);
@@ -202,7 +226,15 @@
 
     public void queueRemove(Transaction tx, AsciiBuffer queueName) throws IOException {
         DestinationEntity destination = destinations.get(queueName);
-        if( destination!=null ) {
+        if (destination != null) {
+            //Remove the message references. 
+            //TODO this should probably be optimized. 
+            Iterator<Entry<Long, Long>> messages = destination.listTrackingNums(tx);
+            while(messages.hasNext())
+            {
+                Long messageKey = messages.next().getKey();
+                removeMessageRef(tx, queueName, messageKey);
+            }
             destinationIndex.remove(tx, queueName);
             destinations.remove(queueName);
             destination.deallocate(tx);
@@ -212,16 +244,16 @@
     public DestinationEntity getDestination(AsciiBuffer queueName) {
         return destinations.get(queueName);
     }
-    
+
     public Iterator<AsciiBuffer> queueList(Transaction tx, AsciiBuffer firstQueueName, int max) {
         return list(destinations, firstQueueName, max);
     }
-    
-    static private <Key,Value> Iterator<Key> list(TreeMap<Key, Value> map, Key first, int max) {
+
+    static private <Key, Value> Iterator<Key> list(TreeMap<Key, Value> map, Key first, int max) {
         ArrayList<Key> rc = new ArrayList<Key>(max);
-        Set<Key> keys = (first==null ? map : map.tailMap(first)).keySet();
+        Set<Key> keys = (first == null ? map : map.tailMap(first)).keySet();
         for (Key buffer : keys) {
-            if( rc.size() >= max ) {
+            if (rc.size() >= max) {
                 break;
             }
             rc.add(buffer);
@@ -253,5 +285,4 @@
         this.lastUpdate = lastUpdate;
     }
 
-
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Mon Mar 30 16:20:28 2009
@@ -21,6 +21,9 @@
 import java.util.Iterator;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import java.io.File;
 
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.protobuf.AsciiBuffer;
@@ -28,30 +31,38 @@
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 
+
 /**
- * An in memory implementation of the {@link Store} interface.
- * It does not properly roll back operations if an error occurs in 
- * the middle of a transaction and it does not persist changes across
- * restarts.
+ * An in memory implementation of the {@link Store} interface. It does not
+ * properly roll back operations if an error occurs in the middle of a
+ * transaction and it does not persist changes across restarts.
  */
 public class MemoryStore implements Store {
 
     private MemorySession session = new MemorySession();
+    private AtomicLong trackingGen = new AtomicLong(0);
+
+    /**
+     * @return a unique sequential store tracking number.
+     */
+    public long allocateStoreTracking() {
+        return trackingGen.incrementAndGet();
+    }
 
     static private class Stream {
 
         private ByteArrayOutputStream baos = new ByteArrayOutputStream();
         private ByteSequence data;
-        
+
         public void write(Buffer buffer) {
-            if( baos == null ) {
+            if (baos == null) {
                 throw new IllegalStateException("Stream closed.");
             }
             baos.write(buffer.data, buffer.offset, buffer.length);
         }
 
         public void close() {
-            if( baos == null ) {
+            if (baos == null) {
                 throw new IllegalStateException("Stream closed.");
             }
             data = baos.toByteSequence();
@@ -59,38 +70,46 @@
         }
 
         public Buffer read(int offset, int max) {
-            if( data == null ) {
+            if (data == null) {
                 throw new IllegalStateException("Stream not closed.");
             }
-            if( offset > data.length ) {
+            if (offset > data.length) {
                 // Invalid offset.
                 return new Buffer(data.data, 0, 0);
             }
             offset += data.offset;
-            max = Math.min(max, data.length-offset );
+            max = Math.min(max, data.length - offset);
             return new Buffer(data.data, offset, max);
         }
-        
+
     }
-    
+
     static private class StoredQueue {
         long sequence;
         TreeMap<Long, QueueRecord> records = new TreeMap<Long, QueueRecord>();
+        // Maps tracking to sequence number:
+        HashMap<Long, Long> trackingMap = new HashMap<Long, Long>();
 
         public Long add(QueueRecord record) {
-            Long key = ++sequence;
-            record.setQueueKey(key);
-            records.put(key, record);
-            return key;
+            long sequenceKey = ++sequence;
+            record.setQueueKey(sequenceKey);
+            records.put(sequenceKey, record);
+            trackingMap.put(record.getMessageKey(), record.getQueueKey());
+            return sequenceKey;
         }
 
-        public void remove(Long queueKey) {
-            records.remove(queueKey);            
+        public boolean remove(Long msgKey) {
+            Long sequenceKey = trackingMap.remove(msgKey);
+            if (sequenceKey != null) {
+                records.remove(sequenceKey);
+                return true;
+            }
+            return false;
         }
 
         public Iterator<QueueRecord> list(Long firstQueueKey, int max) {
             ArrayList<QueueRecord> list = new ArrayList<QueueRecord>(max);
-            for (Long key : records.tailMap(firstQueueKey).keySet() ) {
+            for (Long key : records.tailMap(firstQueueKey).keySet()) {
                 if (list.size() >= max) {
                     break;
                 }
@@ -99,18 +118,17 @@
             return list.iterator();
         }
     }
-    
-    
+
     static private class RemoveOp {
         AsciiBuffer queue;
         Long messageKey;
-        
+
         public RemoveOp(AsciiBuffer queue, Long messageKey) {
             this.queue = queue;
             this.messageKey = messageKey;
         }
     }
-    
+
     static private class Transaction {
         private ArrayList<Long> adds = new ArrayList<Long>(100);
         private ArrayList<RemoveOp> removes = new ArrayList<RemoveOp>(100);
@@ -120,49 +138,70 @@
                 session.queueRemoveMessage(op.queue, op.messageKey);
             }
         }
+
         public void rollback(MemorySession session) {
             for (Long op : adds) {
                 session.messageRemove(op);
             }
         }
+
         public void addMessage(Long messageKey) {
             adds.add(messageKey);
         }
+
         public void removeMessage(AsciiBuffer queue, Long messageKey) {
             removes.add(new RemoveOp(queue, messageKey));
         }
     }
-    
+
+    private static class MessageRecordHolder {
+        final MessageRecord record;
+        int refs = 0;
+
+        public MessageRecordHolder(MessageRecord record) {
+            this.record = record;
+        }
+    }
+
     private class MemorySession implements Session {
-        
-        long messageSequence;
+
         long streamSequence;
-        
-        private HashMap<Long, MessageRecord> messages = new HashMap<Long, MessageRecord>();
-        private HashMap<AsciiBuffer, Long> messagesKeys = new HashMap<AsciiBuffer, Long>();
-        private TreeMap<AsciiBuffer, TreeMap<AsciiBuffer,Buffer>> maps = new TreeMap<AsciiBuffer, TreeMap<AsciiBuffer,Buffer>>();
+
+        private HashMap<Long, MessageRecordHolder> messages = new HashMap<Long, MessageRecordHolder>();
+
+        private TreeMap<AsciiBuffer, TreeMap<AsciiBuffer, Buffer>> maps = new TreeMap<AsciiBuffer, TreeMap<AsciiBuffer, Buffer>>();
         private TreeMap<Long, Stream> streams = new TreeMap<Long, Stream>();
         private TreeMap<AsciiBuffer, StoredQueue> queues = new TreeMap<AsciiBuffer, StoredQueue>();
         private TreeMap<Buffer, Transaction> transactions = new TreeMap<Buffer, Transaction>();
-        
+
         // //////////////////////////////////////////////////////////////////////////////
         // Message related methods.
         // ///////////////////////////////////////////////////////////////////////////////
         public Long messageAdd(MessageRecord record) {
-            Long key = ++messageSequence;
-            record.setKey(key);
-            messages.put(key, record);
-            messagesKeys.put(record.getMessageId(), key);
+            long key = record.getKey();
+            if (key < 0) {
+                throw new IllegalArgumentException("Key not set");
+            }
+            MessageRecordHolder holder = new MessageRecordHolder(record);
+            MessageRecordHolder old = messages.put(key, holder);
+            if (old != null) {
+                messages.put(key, old);
+            }
+
+            // messagesKeys.put(record.getMessageId(), key);
             return key;
         }
+
         public void messageRemove(Long key) {
             messages.remove(key);
         }
-        public Long messageGetKey(AsciiBuffer messageId) {
-            return messagesKeys.get(messageId);
-        }
+
         public MessageRecord messageGetRecord(Long key) {
-            return messages.get(key);
+            MessageRecordHolder holder = messages.get(key);
+            if (holder != null) {
+                return holder.record;
+            }
+            return null;
         }
 
         // //////////////////////////////////////////////////////////////////////////////
@@ -175,21 +214,40 @@
                 queues.put(queueName, queue);
             }
         }
+
         public void queueRemove(AsciiBuffer queueName) {
             StoredQueue queue = queues.get(queueName);
             if (queue != null) {
                 queues.remove(queueName);
             }
         }
+
         public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
             return list(queues, firstQueueName, max);
         }
+
         public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException {
-            return get(queues, queueName).add(record);
+            Long sequenceKey = get(queues, queueName).add(record);
+            MessageRecordHolder holder = messages.get(record.getMessageKey());
+            if (holder != null) {
+                holder.refs++;
+            }
+            return sequenceKey;
+
         }
-        public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException {
-            get(queues, queueName).remove(queueKey);
+
+        public void queueRemoveMessage(AsciiBuffer queueName, Long msgKey) throws KeyNotFoundException {
+            if (get(queues, queueName).remove(msgKey)) {
+                MessageRecordHolder holder = messages.get(msgKey);
+                if (holder != null) {
+                    holder.refs--;
+                    if (holder.refs <= 0) {
+                        messages.remove(msgKey);
+                    }
+                }
+            }
         }
+
         public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException {
             return get(queues, queueName).list(firstQueueKey, max);
         }
@@ -199,27 +257,33 @@
         // data.
         // ///////////////////////////////////////////////////////////////////////////////
         public boolean mapAdd(AsciiBuffer mapName) {
-            if( maps.containsKey(mapName) ) {
+            if (maps.containsKey(mapName)) {
                 return false;
             }
             maps.put(mapName, new TreeMap<AsciiBuffer, Buffer>());
             return true;
         }
+
         public boolean mapRemove(AsciiBuffer mapName) {
-            return maps.remove(mapName)!=null;
+            return maps.remove(mapName) != null;
         }
+
         public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
             return list(maps, first, max);
-        }        
+        }
+
         public Buffer mapEntryGet(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
             return get(maps, mapName).get(key);
         }
+
         public Buffer mapEntryRemove(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
             return get(maps, mapName).remove(key);
         }
+
         public Buffer mapEntryPut(AsciiBuffer mapName, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
             return get(maps, mapName).put(key, value);
         }
+
         public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer mapName, AsciiBuffer first, int max) throws KeyNotFoundException {
             return list(get(maps, mapName), first, max);
         }
@@ -232,17 +296,21 @@
             streams.put(id, new Stream());
             return id;
         }
+
         public void streamWrite(Long streamKey, Buffer buffer) throws KeyNotFoundException {
             get(streams, streamKey).write(buffer);
         }
+
         public void streamClose(Long streamKey) throws KeyNotFoundException {
             get(streams, streamKey).close();
         }
+
         public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException {
             return get(streams, streamKey).read(offset, max);
         }
+
         public boolean streamRemove(Long streamKey) {
-            return streams.remove(streamKey)!=null;
+            return streams.remove(streamKey) != null;
         }
 
         // ///////////////////////////////////////////////////////////////////////////////
@@ -251,22 +319,38 @@
         public void transactionAdd(Buffer txid) {
             transactions.put(txid, new Transaction());
         }
+
         public void transactionCommit(Buffer txid) throws KeyNotFoundException {
             remove(transactions, txid).commit(this);
         }
+
         public void transactionRollback(Buffer txid) throws KeyNotFoundException {
             remove(transactions, txid).rollback(this);
         }
+
         public Iterator<Buffer> transactionList(Buffer first, int max) {
             return list(transactions, first, max);
         }
+
         public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException {
             get(transactions, txid).addMessage(messageKey);
+            MessageRecordHolder holder = messages.get(messageKey);
+            if (holder != null) {
+                holder.refs++;
+            }
         }
+
         public void transactionRemoveMessage(Buffer txid, AsciiBuffer queue, Long messageKey) throws KeyNotFoundException {
             get(transactions, txid).removeMessage(queue, messageKey);
+            MessageRecordHolder holder = messages.get(messageKey);
+            if (holder != null) {
+                holder.refs--;
+                if (holder.refs <= 0) {
+                    messages.remove(messageKey);
+                }
+            }
         }
-        
+
     }
 
     public void start() throws Exception {
@@ -277,7 +361,7 @@
 
     public <R, T extends Exception> R execute(Callback<R, T> callback, Runnable runnable) throws T {
         R rc = callback.execute(session);
-        if( runnable!=null ) {
+        if (runnable != null) {
             runnable.run();
         }
         return rc;
@@ -285,12 +369,12 @@
 
     public void flush() {
     }
-    
-    static private <Key,Value> Iterator<Key> list(TreeMap<Key, Value> map, Key first, int max) {
+
+    static private <Key, Value> Iterator<Key> list(TreeMap<Key, Value> map, Key first, int max) {
         ArrayList<Key> rc = new ArrayList<Key>(max);
-        Set<Key> keys = (first==null ? map : map.tailMap(first)).keySet();
+        Set<Key> keys = (first == null ? map : map.tailMap(first)).keySet();
         for (Key buffer : keys) {
-            if( rc.size() >= max ) {
+            if (rc.size() >= max) {
                 break;
             }
             rc.add(buffer);
@@ -298,19 +382,29 @@
         return rc.iterator();
     }
 
-    static private <Key,Value> Value get(TreeMap<Key, Value> map, Key key) throws KeyNotFoundException {
+    static private <Key, Value> Value get(TreeMap<Key, Value> map, Key key) throws KeyNotFoundException {
         Value value = map.get(key);
-        if( value == null ) {
+        if (value == null) {
             throw new KeyNotFoundException(key.toString());
         }
         return value;
     }
-    static private <Key,Value> Value remove(TreeMap<Key, Value> map, Key key) throws KeyNotFoundException {
+
+    static private <Key, Value> Value remove(TreeMap<Key, Value> map, Key key) throws KeyNotFoundException {
         Value value = map.remove(key);
-        if( value == null ) {
+        if (value == null) {
             throw new KeyNotFoundException(key.toString());
         }
         return value;
     }
 
+    public void setStoreDirectory(File directory) {
+        // NOOP
+    }
+
+    public void setDeleteAllMessages(boolean val) {
+        // TODO Auto-generated method stub
+        
+    }
+
 }



Mime
View raw message