activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r769099 [2/5] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/protocol/ main/java/org/apache/activemq/broker/stomp/ main/jav...
Date Mon, 27 Apr 2009 18:40:49 GMT
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Mon Apr 27 18:40:44 2009
@@ -22,6 +22,8 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.Set;
+import java.util.Map.Entry;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -35,6 +37,7 @@
 import org.apache.activemq.broker.store.Store.FatalStoreException;
 import org.apache.activemq.broker.store.Store.KeyNotFoundException;
 import org.apache.activemq.broker.store.Store.MessageRecord;
+import org.apache.activemq.broker.store.Store.QueueQueryResult;
 import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.broker.store.Store.Session;
 import org.apache.activemq.dispatch.IDispatcher;
@@ -45,12 +48,16 @@
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
-import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.QueueStore.QueueDescriptor;
+import org.apache.activemq.queue.QueueStore.RestoreListener;
+import org.apache.activemq.queue.QueueStore.RestoredElement;
 import org.apache.kahadb.util.LinkedNode;
 import org.apache.kahadb.util.LinkedNodeList;
 
 public class BrokerDatabase extends AbstractLimitedFlowResource<BrokerDatabase.Operation> {
 
+    private static final boolean DEBUG = false;
     private final Store store;
     private final Flow databaseFlow = new Flow("database", false);
 
@@ -68,13 +75,12 @@
     private AtomicBoolean notify = new AtomicBoolean(false);
     private Semaphore opsReady = new Semaphore(0);
     private long opSequenceNumber;
-    private long flushPointer = 0; // The last seq num for which flush was
-    // requested
+    private long flushPointer = -1; // The last seq num for which flush was requested    
     private long requestedDelayedFlushPointer = -1; // Set to the last sequence
     // num scheduled for delay
     private long delayedFlushPointer = 0; // The last delayable sequence num
     // requested.
-    private final long FLUSH_DELAY_MS = 100;
+    private final long FLUSH_DELAY_MS = 50;
     private final Runnable flushDelayCallback;
 
     public interface DatabaseListener {
@@ -87,26 +93,11 @@
         public void onDatabaseException(IOException ioe);
     }
 
-    /**
-     * Holder of a restored message to be passed to a
-     * {@link MessageRestoreListener}. This allows the demarshalling to be done
-     * by the listener instead of the the database worker.
-     * 
-     * @author cmacnaug
-     */
-    public interface RestoredMessage {
-        MessageDelivery getMessageDelivery() throws IOException;
-    }
-
-    public interface MessageRestoreListener {
-        public void messagesRestored(Collection<RestoredMessage> msgs);
-    }
-
     public BrokerDatabase(Store store, IDispatcher dispatcher) {
         this.store = store;
         this.dispatcher = dispatcher;
         this.opQueue = new LinkedNodeList<OperationBase>();
-        storeLimiter = new SizeLimiter<OperationBase>(5000, 0) {
+        storeLimiter = new SizeLimiter<OperationBase>(10000, 5000) {
 
             @Override
             public int getElementSize(OperationBase op) {
@@ -153,6 +144,11 @@
     public synchronized void stop() throws Exception {
         if (flushThread != null) {
 
+            synchronized(opQueue)
+            {
+                updateFlushPointer(opSequenceNumber + 1);
+            }
+            
             running.set(false);
             boolean interrupted = false;
             while (true) {
@@ -175,6 +171,18 @@
         }
     }
 
+    public Iterator<QueueQueryResult> listQueues(final short type) throws Exception {
+        // TODO Auto-generated method stub
+        return store.execute(new Callback<Iterator<QueueQueryResult>, Exception>() {
+
+            public Iterator<QueueQueryResult> execute(Session session) throws Exception {
+                // TODO Auto-generated method stub
+                return session.queueListByType(type, null, Integer.MAX_VALUE);
+            }
+
+        }, null);
+    }
+
     /**
      * Executes user supplied {@link Operation}. If the {@link Operation} does
      * not throw any Exceptions, all updates to the store are committed,
@@ -214,7 +222,7 @@
         synchronized (opQueue) {
             op.opSequenceNumber = opSequenceNumber++;
             opQueue.addLast(op);
-            if (op.flushRequested) {
+            if (op.flushRequested || storeLimiter.getThrottled()) {
                 if (op.isDelayable() && FLUSH_DELAY_MS > 0) {
                     scheduleDelayedFlush(op.opSequenceNumber);
                 } else {
@@ -227,7 +235,8 @@
     private void updateFlushPointer(long seqNumber) {
         if (seqNumber > flushPointer) {
             flushPointer = seqNumber;
-            if (notify.get()) {
+            OperationBase op = opQueue.getHead();
+            if (op != null && op.opSequenceNumber <= flushPointer && notify.get()) {
                 opsReady.release();
             }
         }
@@ -253,14 +262,18 @@
         synchronized (opQueue) {
             if (flushPointer < requestedDelayedFlushPointer) {
                 updateFlushPointer(requestedDelayedFlushPointer);
-                requestedDelayedFlushPointer = -1;
-                // Schedule next delay if needed:
-                if (delayedFlushPointer > flushPointer) {
-                    scheduleDelayedFlush(delayedFlushPointer);
-                } else {
-                    delayedFlushPointer = -1;
-                }
+                
+            }
+            
+            //If another delayed flush has been scheduled schedule it:
+            requestedDelayedFlushPointer = -1;
+            // Schedule next delay if needed:
+            if (delayedFlushPointer > flushPointer) {
+                scheduleDelayedFlush(delayedFlushPointer);
+            } else {
+                delayedFlushPointer = -1;
             }
+            
         }
     }
 
@@ -268,7 +281,7 @@
         if (!wait) {
             synchronized (opQueue) {
                 OperationBase op = opQueue.getHead();
-                if (op != null && op.opSequenceNumber <= flushPointer) {
+                if (op != null && (op.opSequenceNumber <= flushPointer || !op.isDelayable())) {
                     op.unlink();
                     return op;
                 }
@@ -293,19 +306,15 @@
         }
     }
 
-    private class OpCounter {
-        int count = 0;
-    }
-
     private final void processOps() {
-        final OpCounter counter = new OpCounter();
+        int count = 0;
 
         while (running.get()) {
             final OperationBase firstOp = getNextOp(true);
             if (firstOp == null) {
                 continue;
             }
-            counter.count = 0;
+            count = 0;
 
             // The first operation we get, triggers a store transaction.
             if (firstOp != null) {
@@ -317,26 +326,27 @@
                     // refactor.
                     while (op != null) {
                         final Operation toExec = op;
-                        counter.count++;
-                        //System.out.println("Executing " + op);
-                        store.execute(new Store.VoidCallback<Exception>() {
-                            @Override
-                            public void run(Session session) throws Exception {
-
-                                // Try to execute the operation against the
-                                // session...
-                                try {
-                                    if (toExec.execute(session)) {
+                        if (toExec.beginExecute()) {
+                            count++;
+
+                            store.execute(new Store.VoidCallback<Exception>() {
+                                @Override
+                                public void run(Session session) throws Exception {
+
+                                    // Try to execute the operation against the
+                                    // session...
+                                    try {
+                                        toExec.execute(session);
                                         processedQueue.add(toExec);
-                                    } else {
-                                        counter.count--;
+                                    } catch (CancellationException ignore) {
+                                        // System.out.println("Cancelled" +
+                                        // toExec);
                                     }
-                                } catch (CancellationException ignore) {
-                                    //System.out.println("Cancelled" + toExec);
                                 }
-                            }
-                        }, null);
-                        if (counter.count < 1000) {
+                            }, null);
+                        }
+                        
+                        if (count < 1000) {
                             op = getNextOp(false);
                         } else {
                             op = null;
@@ -370,11 +380,12 @@
                     for (Operation processed : processedQueue) {
                         processed.onRollback(e);
                     }
+                    onDatabaseException(new IOException(e));
                 } catch (Exception e) {
                     for (Operation processed : processedQueue) {
                         processed.onRollback(e);
                     }
-
+                    onDatabaseException(new IOException(e));
                 }
             }
         }
@@ -405,7 +416,7 @@
      * @param queue
      *            The queue to add.
      */
-    public void addQueue(AsciiBuffer queue) {
+    public void addQueue(QueueStore.QueueDescriptor queue) {
         add(new QueueAddOperation(queue), null, false);
     }
 
@@ -415,7 +426,7 @@
      * @param queue
      *            The queue to delete.
      */
-    public void deleteQueue(AsciiBuffer queue) {
+    public void deleteQueue(QueueStore.QueueDescriptor queue) {
         add(new QueueDeleteOperation(queue), null, false);
     }
 
@@ -449,8 +460,8 @@
      * 
      * @return The {@link OperationContext} associated with the operation
      */
-    public OperationContext saveMessage(MessageDelivery delivery, AsciiBuffer queue, ISourceController<?> source) throws IOException {
-        return add(new AddMessageOperation(delivery, queue), source, false);
+    public OperationContext saveMessage(MessageDelivery delivery, QueueStore.QueueDescriptor queue, long queueSequence, ISourceController<?> source) throws IOException {
+        return add(new AddMessageOperation(delivery, queue, queueSequence), source, false);
     }
 
     /**
@@ -462,8 +473,8 @@
      *            The queue.
      * @return The {@link OperationContext} associated with the operation
      */
-    public OperationContext deleteMessage(MessageDelivery delivery, AsciiBuffer queue) {
-        return add(new DeleteMessageOperation(delivery, queue), null, false);
+    public OperationContext deleteMessage(MessageDelivery delivery, QueueStore.QueueDescriptor queue) {
+        return add(new DeleteMessageOperation(delivery.getStoreTracking(), queue), null, false);
     }
 
     /**
@@ -476,20 +487,25 @@
      * @param queue
      *            The queue for which to load messages
      * @param first
-     *            The first queue sequence number to load.
+     *            The first queue sequence number to load (-1 starts at
+     *            begining)
+     * @param maxSequence
+     *            The maximum sequence number to load (-1 if no limit)
      * @param max
-     *            The maximum number of messages to load.
+     *            The maximum number of messages to load (-1 if no limit)
      * @param listener
      *            The listener to which messags should be passed.
      * @return The {@link OperationContext} associated with the operation
      */
-    public OperationContext restoreMessages(AsciiBuffer queue, long first, int max, MessageRestoreListener listener) {
-        return add(new RestoreMessageOperation(queue, first, max, listener), null, false);
+    public OperationContext restoreMessages(QueueStore.QueueDescriptor queue, long first, long maxSequence, int maxCount, RestoreListener<MessageDelivery> listener) {
+        return add(new RestoreMessageOperation(queue, first, maxCount, maxSequence, listener), null, true);
     }
 
     private void onDatabaseException(IOException ioe) {
         if (listener != null) {
             listener.onDatabaseException(ioe);
+        } else {
+            ioe.printStackTrace();
         }
     }
 
@@ -513,6 +529,12 @@
          */
         public boolean executed();
 
+        /**
+         * Requests flush for this database operation (overriding a previous
+         * delay)
+         */
+        public void requestFlush();
+
     }
 
     /**
@@ -524,6 +546,14 @@
     public interface Operation extends OperationContext {
 
         /**
+         * Called when the saver is about to execute the operation. If true is
+         * returned the operation can no longer be canceled.
+         * 
+         * @return false if the operation has been canceled.
+         */
+        public boolean beginExecute();
+
+        /**
          * Gets called by the
          * {@link Store#add(Operation, ISourceController, boolean)} method
          * within a transactional context. If any exception is thrown including
@@ -532,7 +562,6 @@
          * @param session
          *            provides you access to read and update the persistent
          *            data.
-         * @return the result of the CallableCallback
          * @throws Exception
          *             if an system error occured while executing the
          *             operations.
@@ -540,7 +569,7 @@
          *             if an system error occured while executing the
          *             operations.
          */
-        public boolean execute(Session session) throws CancellationException, Exception, RuntimeException;
+        public void execute(Session session) throws CancellationException, Exception, RuntimeException;
 
         /**
          * Returns true if this operation can be delayed. This is useful in
@@ -608,6 +637,21 @@
         }
 
         /**
+         * Called when the saver is about to execute the operation. If true is
+         * returned the operation can no longer be cancelled.
+         * 
+         * @return true if operation should be executed
+         */
+        public final boolean beginExecute() {
+            if (executePending.compareAndSet(true, false)) {
+                executed.set(true);
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        /**
          * Gets called by the
          * {@link Store#add(Operation, ISourceController, boolean)} method
          * within a transactional context. If any exception is thrown including
@@ -616,7 +660,6 @@
          * @param session
          *            provides you access to read and update the persistent
          *            data.
-         * @return True if processed, false otherwise
          * @throws Exception
          *             if an system error occured while executing the
          *             operations.
@@ -624,14 +667,9 @@
          *             if an system error occured while executing the
          *             operations.
          */
-        public boolean execute(Session session) throws Exception, RuntimeException {
-            if (executePending.compareAndSet(true, false)) {
-                executed.set(true);
-                doExcecute(session);
-                return true;
-            } else {
-                return false;
-            }
+        public void execute(Session session) throws Exception, RuntimeException {
+            if(DEBUG) System.out.println("Executing " + this);
+            doExcecute(session);
         }
 
         abstract protected void doExcecute(Session session);
@@ -644,6 +682,16 @@
             return false;
         }
 
+        /**
+         * Requests flush for this database operation (overriding a previous
+         * delay)
+         */
+        public void requestFlush() {
+            synchronized (this) {
+                updateFlushPointer(opSequenceNumber);
+            }
+        }
+
         public void onCommit() {
         }
 
@@ -662,10 +710,10 @@
 
     private class QueueAddOperation extends OperationBase {
 
-        private AsciiBuffer queue;
+        private QueueStore.QueueDescriptor qd;
 
-        QueueAddOperation(AsciiBuffer queue) {
-            this.queue = queue;
+        QueueAddOperation(QueueStore.QueueDescriptor queue) {
+            qd = queue;
         }
 
         @Override
@@ -676,7 +724,11 @@
 
         @Override
         protected void doExcecute(Session session) {
-            session.queueAdd(queue);
+            try {
+                session.queueAdd(qd);
+            } catch (KeyNotFoundException e) {
+                throw new FatalStoreException(e);
+            }
         }
 
         @Override
@@ -685,16 +737,16 @@
         }
 
         public String toString() {
-            return "QueueAdd: " + queue.toString();
+            return "QueueAdd: " + qd.getQueueName().toString();
         }
     }
 
     private class QueueDeleteOperation extends OperationBase {
 
-        private AsciiBuffer queue;
+        private QueueStore.QueueDescriptor qd;
 
-        QueueDeleteOperation(AsciiBuffer queue) {
-            this.queue = queue;
+        QueueDeleteOperation(QueueStore.QueueDescriptor queue) {
+            qd = queue;
         }
 
         @Override
@@ -705,7 +757,7 @@
 
         @Override
         protected void doExcecute(Session session) {
-            session.queueRemove(queue);
+            session.queueRemove(qd);
         }
 
         @Override
@@ -714,23 +766,23 @@
         }
 
         public String toString() {
-            return "QueueDelete: " + queue.toString();
+            return "QueueDelete: " + qd.getQueueName().toString();
         }
     }
 
     private class DeleteMessageOperation extends OperationBase {
         private final long storeTracking;
-        private AsciiBuffer queue;
+        private QueueStore.QueueDescriptor queue;
 
-        public DeleteMessageOperation(MessageDelivery delivery, AsciiBuffer queue) {
-            this.storeTracking = delivery.getStoreTracking();
+        public DeleteMessageOperation(long tracking, QueueStore.QueueDescriptor queue) {
+            this.storeTracking = tracking;
             this.queue = queue;
         }
 
         @Override
         public int getLimiterSize() {
             // Might consider bumping this up to avoid too much accumulation?
-            return 0;
+            return 1;
         }
 
         @Override
@@ -757,16 +809,18 @@
     }
 
     private class RestoreMessageOperation extends OperationBase {
-        private AsciiBuffer queue;
+        private QueueStore.QueueDescriptor queue;
         private long firstKey;
         private int maxRecords;
-        private MessageRestoreListener listener;
-        private Collection<RestoredMessage> msgs = null;
+        private long maxSequence;
+        private RestoreListener<MessageDelivery> listener;
+        private Collection<RestoredElement<MessageDelivery>> msgs = null;
 
-        RestoreMessageOperation(AsciiBuffer queue, long firstKey, int maxRecords, MessageRestoreListener listener) {
+        RestoreMessageOperation(QueueStore.QueueDescriptor queue, long firstKey, int maxRecords, long maxSequence, RestoreListener<MessageDelivery> listener) {
             this.queue = queue;
             this.firstKey = firstKey;
             this.maxRecords = maxRecords;
+            this.maxSequence = maxSequence;
             this.listener = listener;
         }
 
@@ -775,17 +829,47 @@
 
             Iterator<QueueRecord> records = null;
             try {
-                records = session.queueListMessagesQueue(queue, firstKey, maxRecords);
-
+                records = session.queueListMessagesQueue(queue, firstKey, maxSequence, maxRecords);
+                msgs = new LinkedList<RestoredElement<MessageDelivery>>();
             } catch (KeyNotFoundException e) {
-                msgs = new ArrayList<RestoredMessage>(0);
+                msgs = new ArrayList<RestoredElement<MessageDelivery>>(0);
                 return;
             }
 
-            while (records.hasNext()) {
+            QueueRecord qRecord = null;
+            int count = 0;
+            if (records.hasNext()) {
+                qRecord = records.next();
+            }
+
+            while (qRecord != null) {
                 RestoredMessageImpl rm = new RestoredMessageImpl();
                 // TODO should update jms redelivery here.
-                rm.qRecord = records.next();
+                rm.qRecord = qRecord;
+                count++;
+
+                // Set the next sequence number:
+                if (records.hasNext()) {
+                    qRecord = records.next();
+                    rm.nextSequence = qRecord.getQueueKey();
+                } else {
+                     // Look up the next sequence number:
+                    try {
+                        records = session.queueListMessagesQueue(queue, qRecord.getQueueKey() + 1, -1L, 1);
+                        if(!records.hasNext())
+                        {
+                            rm.nextSequence = -1;
+                        }
+                        else
+                        {
+                            rm.nextSequence = records.next().queueKey;
+                        }
+                    } catch (KeyNotFoundException e) {
+                        rm.nextSequence = -1;
+                    }
+                    qRecord = null;
+                }
+
                 try {
                     rm.mRecord = session.messageGetRecord(rm.qRecord.messageKey);
                     rm.handler = protocolHandlers.get(rm.mRecord.encoding.toString());
@@ -797,14 +881,19 @@
                             throw new RuntimeException("Unknown message format" + rm.mRecord.encoding.toString(), thrown);
                         }
                     }
+                    msgs.add(rm);
                 } catch (KeyNotFoundException shouldNotHappen) {
+                    shouldNotHappen.printStackTrace();
                 }
             }
+
+            if(DEBUG)
+                System.out.println("Restored: " + count + " messages");
         }
 
         @Override
         public void onCommit() {
-            listener.messagesRestored(msgs);
+            listener.elementsRestored(msgs);
         }
 
         public String toString() {
@@ -817,7 +906,8 @@
         private final BrokerMessageDelivery brokerDelivery;
 
         private final MessageDelivery delivery;
-        private final AsciiBuffer target;
+        private final long queueSequence;
+        private final QueueStore.QueueDescriptor target;
         private MessageRecord record;
 
         private final boolean delayable;
@@ -825,6 +915,7 @@
         public AddMessageOperation(BrokerMessageDelivery delivery) throws IOException {
             this.brokerDelivery = delivery;
             this.delivery = delivery;
+            this.queueSequence = -1;
             target = null;
             this.delayable = delivery.isFlushDelayable();
             if (!delayable) {
@@ -832,10 +923,11 @@
             }
         }
 
-        public AddMessageOperation(MessageDelivery delivery, AsciiBuffer target) throws IOException {
+        public AddMessageOperation(MessageDelivery delivery, QueueStore.QueueDescriptor target, long queueSequence) throws IOException {
             this.brokerDelivery = null;
             this.delivery = delivery;
             this.target = target;
+            this.queueSequence = queueSequence;
             this.record = delivery.createMessageRecord();
             delayable = false;
         }
@@ -854,7 +946,7 @@
 
             if (target == null) {
                 brokerDelivery.beginStore();
-                Collection<AsciiBuffer> targets = brokerDelivery.getPersistentQueues();
+                Set<Entry<QueueDescriptor, Long>> targets = brokerDelivery.getPersistentQueues();
 
                 if (!targets.isEmpty()) {
                     if (record == null) {
@@ -867,12 +959,14 @@
                     record.setKey(delivery.getStoreTracking());
                     session.messageAdd(record);
 
-                    for (AsciiBuffer target : brokerDelivery.getPersistentQueues()) {
+                    for (Entry<QueueDescriptor, Long> target : brokerDelivery.getPersistentQueues()) {
                         try {
                             QueueRecord queueRecord = new QueueRecord();
                             queueRecord.setAttachment(null);
                             queueRecord.setMessageKey(record.getKey());
-                            session.queueAddMessage(target, queueRecord);
+                            queueRecord.setSize(brokerDelivery.getFlowLimiterSize());
+                            queueRecord.setQueueKey(target.getValue());
+                            session.queueAddMessage(target.getKey(), queueRecord);
 
                         } catch (KeyNotFoundException e) {
                             e.printStackTrace();
@@ -890,6 +984,8 @@
                     QueueRecord queueRecord = new QueueRecord();
                     queueRecord.setAttachment(null);
                     queueRecord.setMessageKey(record.getKey());
+                    queueRecord.setSize(brokerDelivery.getFlowLimiterSize());
+                    queueRecord.setQueueKey(queueSequence);
                     session.queueAddMessage(target, queueRecord);
                 } catch (KeyNotFoundException e) {
                     e.printStackTrace();
@@ -907,18 +1003,35 @@
         }
     }
 
-    private class RestoredMessageImpl implements RestoredMessage {
+    private class RestoredMessageImpl implements RestoredElement<MessageDelivery> {
         QueueRecord qRecord;
         MessageRecord mRecord;
         ProtocolHandler handler;
+        long nextSequence = -1;
+
+        public MessageDelivery getElement() throws IOException {
+            BrokerMessageDelivery delivery = handler.createMessageDelivery(mRecord);
+            delivery.setFromDatabase(BrokerDatabase.this, mRecord);
+            return delivery;
+        }
+
+        public long getSequenceNumber() {
+            return qRecord.getQueueKey();
+        }
+
+        public long getStoreTracking() {
+            return qRecord.getMessageKey();
+        }
 
-        public MessageDelivery getMessageDelivery() throws IOException {
-            return handler.createMessageDelivery(mRecord);
+        public long getNextSequenceNumber() {
+            return nextSequence;
         }
+
     }
 
     public long allocateStoreTracking() {
         // TODO Auto-generated method stub
         return store.allocateStoreTracking();
     }
+
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java Mon Apr 27 18:40:44 2009
@@ -1,124 +0,0 @@
-package org.apache.activemq.broker.store;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.queue.QueueStoreHelper;
-import org.apache.activemq.queue.SingleFlowRelay;
-import org.apache.activemq.broker.MessageDelivery;
-import org.apache.activemq.broker.store.BrokerDatabase.RestoredMessage;
-import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
-import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
-import org.apache.activemq.flow.Flow;
-import org.apache.activemq.flow.IFlowController;
-import org.apache.activemq.flow.IFlowRelay;
-import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.ISinkController;
-import org.apache.activemq.flow.SizeLimiter;
-import org.apache.activemq.flow.ISinkController.FlowUnblockListener;
-
-
-public class MessageDeliveryStoreHelper implements QueueStoreHelper<MessageDelivery>, Dispatchable, BrokerDatabase.MessageRestoreListener {
-
-    private final BrokerDatabase database;
-    private final AsciiBuffer queue;
-    private final DispatchContext dispatchContext;
-    private final ConcurrentLinkedQueue<RestoredMessage> restoredMsgs = new ConcurrentLinkedQueue<RestoredMessage>();
-    private final IFlowRelay<MessageDelivery> restoreRelay;
-    private final SizeLimiter<MessageDelivery> restoreLimiter;
-    private final IFlowController<MessageDelivery> controller;
-    private final FlowUnblockListener<MessageDelivery> unblockListener;
-    private final IFlowSink<MessageDelivery> targetSink;
-
-    private int RESTORE_BATCH_SIZE = 50;
-    
-    private AtomicBoolean started = new AtomicBoolean(false);
-    private AtomicBoolean restoreComplete = new AtomicBoolean(false);
-    private AtomicBoolean storeLoaded = new AtomicBoolean(false);
-
-    private static enum State {
-        STOPPED, RESTORING, RESTORED
-    };
-
-    private State state = State.RESTORING;
-
-    MessageDeliveryStoreHelper(BrokerDatabase database, AsciiBuffer queueName, IFlowSink<MessageDelivery> sink, IDispatcher dispatcher) {
-        this.database = database;
-        this.queue = queueName;
-        this.targetSink = sink;
-        Flow flow = new Flow("MessageRestorer-" + queue, false);
-        restoreLimiter = new SizeLimiter<MessageDelivery>(1000, 500) {
-            @Override
-            public int getElementSize(MessageDelivery msg) {
-                return msg.getFlowLimiterSize();
-            }
-        };
-        restoreRelay = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), restoreLimiter);
-        controller = restoreRelay.getFlowController(flow);
-        dispatchContext = dispatcher.register(this, flow.getFlowName());
-
-        unblockListener = new FlowUnblockListener<MessageDelivery>() {
-            public void onFlowUnblocked(ISinkController<MessageDelivery> controller) {
-                dispatchContext.requestDispatch();
-            }
-        };
-    }
-
-    public void delete(MessageDelivery elem, boolean flush) {
-        elem.delete(queue);
-    }
-
-    public void save(MessageDelivery elem, boolean flush) throws IOException {
-        elem.persist(queue, !flush);
-    }
-
-    public boolean hasStoredElements() {
-        return !restoreComplete.get();
-    }
-
-    public void startLoadingQueue() {
-        database.restoreMessages(queue, 0, RESTORE_BATCH_SIZE, this);
-    }
-
-    public void stopLoadingQueue() {
-        // TODO Auto-generated method stub
-    }
-
-    public boolean dispatch() {
-
-        RestoredMessage restored = restoredMsgs.poll();
-        if (restored == null || restoreComplete.get()) {
-            return true;
-        }
-
-        if (controller.isSinkBlocked()) {
-            if (controller.addUnblockListener(unblockListener)) {
-                return true;
-            }
-        } else {
-            try {
-                targetSink.add(restored.getMessageDelivery(), controller);
-            } catch (IOException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-            }
-        }
-
-        return false;
-    }
-
-    public void messagesRestored(Collection<RestoredMessage> msgs) {
-        if (!msgs.isEmpty()) {
-            restoredMsgs.addAll(msgs);
-        } else {
-            storeLoaded.set(true);
-        }
-        dispatchContext.requestDispatch();
-
-        dispatchContext.requestDispatch();
-    }
-}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java Mon Apr 27 18:40:44 2009
@@ -17,11 +17,14 @@
 package org.apache.activemq.broker.store;
 
 import java.io.File;
+import java.util.Collection;
 import java.util.Iterator;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.QueueStore.QueueDescriptor;
 
 /**
  * Interface to persistently store and access data needed by the messaging
@@ -163,6 +166,33 @@
         Long queueKey;
         Long messageKey;
         Buffer attachment;
+        int size;
+        boolean redelivered;
+        long tte;
+        
+        public boolean isRedelivered() {
+            return redelivered;
+        }
+
+        public void setRedelivered(boolean redelivered) {
+            this.redelivered = redelivered;
+        }
+
+        public long getTte() {
+            return tte;
+        }
+
+        public void setTte(long tte) {
+            this.tte = tte;
+        }
+
+        public int getSize() {
+            return size;
+        }
+
+        public void setSize(int size) {
+            this.size = size;
+        }
 
         public Long getQueueKey() {
             return queueKey;
@@ -194,6 +224,16 @@
         Long key = (long) -1;
         AsciiBuffer messageId;
         AsciiBuffer encoding;
+        int size;
+
+        public int getSize() {
+            return size;
+        }
+
+        public void setSize(int size) {
+            this.size = size;
+        }
+
         Buffer buffer;
         Long streamKey;
 
@@ -239,6 +279,48 @@
     }
 
     /**
+     * Result Holder for queue related queries.
+     */
+    public interface QueueQueryResult {
+
+        /**
+         * @return the descriptor for the queue.
+         */
+        public QueueStore.QueueDescriptor getDescriptor();
+
+        /**
+         * Gets the count of elements in this queue. Note that this does not
+         * include counts for elements held in child partitions.
+         * 
+         * @return the number of elements in the queue.
+         */
+        public int getCount();
+
+        /**
+         * Gets the size of elements in this queue. Note that this does not
+         * include size of elements held in child partitions.
+         * 
+         * @return the total size of elements in the queue
+         */
+        public long getSize();
+
+        /**
+         * @return the first sequence number in the queue.
+         */
+        public long getFirstSequence();
+        
+        /**
+         * @return the last sequence number in the queue.
+         */
+        public long getLastSequence();
+        
+        /**
+         * @return The results for this queue's partitions
+         */
+        public Collection<QueueQueryResult> getPartitions();
+    }
+
+    /**
      * Executes user supplied {@link Callback}. If the {@link Callback} does not
      * throw any Exceptions, all updates to the store are committed to the store
      * as a single unit of work, otherwise they are rolled back.
@@ -297,24 +379,81 @@
 
         public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException;
 
-        public void transactionRemoveMessage(Buffer txid, AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException;
+        public void transactionRemoveMessage(Buffer txid, QueueStore.QueueDescriptor queueName, Long messageKey) throws KeyNotFoundException;
 
         public void transactionCommit(Buffer txid) throws KeyNotFoundException;
 
         public void transactionRollback(Buffer txid) throws KeyNotFoundException;
 
-        // Queue related methods.
-        public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max);
+        /**
+         * Gets a list of queues. The returned iterator returns top-level queues
+         * (e.g. queues without a parent). The child queues are accessible via
+         * {@link QueueQueryResult#getPartitions()}.
+         * 
+         * @param firstQueueName
+         *            If null starts the query at the first queue.
+         * @param max
+         *            The maximum number of queues to return
+         * @return The list of queues.
+         */
+        public Iterator<QueueQueryResult> queueList(QueueStore.QueueDescriptor firstQueueName, int max);
 
-        public void queueAdd(AsciiBuffer queueName);
+        /**
+         * Gets a list of queues for which
+         * {@link QueueDescriptor#getQueueType()} matches the specified type.
+         * The returned iterator returns top-level queues (e.g. queues without a
+         * parent). The child queues are accessible via
+         * {@link QueueQueryResult#getPartitions()}.
+         * 
+         * @param firstQueueName
+         *            If null starts the query at the first queue.
+         * @param max
+         *            The maximum number of queues to return
+         * @param type
+         *            The type of queue to consider
+         * @return The list of queues.
+         */
+        public Iterator<QueueQueryResult> queueListByType(short type, QueueStore.QueueDescriptor firstQueueName, int max);
 
-        public void queueRemove(AsciiBuffer queueName);
+        /**
+         * Adds a queue. If {@link QueueDescriptor#getParent()} is specified
+         * then the parent queue must exist.
+         * 
+         * @param queue
+         *            The queue to add.
+         * 
+         * @throws KeyNotFoundException
+         *             if the descriptor specifies a non existent parent
+         */
+        public void queueAdd(QueueStore.QueueDescriptor queue) throws KeyNotFoundException;
 
-        public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException;
+        /**
+         * Deletes a queue and all of it's messages. If it has any child
+         * partitions they are deleted as well.
+         * 
+         * @param queue
+         *            The queue to delete
+         */
+        public void queueRemove(QueueStore.QueueDescriptor queue);
+
+        /**
+         * Adds a reference to the message for the given queue. The associated
+         * queue record contains the sequence number of the message in this
+         * queue and the store tracking number of the associated message.
+         * 
+         * @param queue
+         *            The queue descriptor
+         * @param record
+         *            The queue record
+         * @throws KeyNotFoundException
+         *             If there is no message associated with
+         *             {@link QueueRecord#getMessageKey()}
+         */
+        public void queueAddMessage(QueueStore.QueueDescriptor queue, QueueRecord record) throws KeyNotFoundException;
 
-        public void queueRemoveMessage(AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException;
+        public void queueRemoveMessage(QueueStore.QueueDescriptor queue, Long messageKey) throws KeyNotFoundException;
 
-        public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException;
+        public Iterator<QueueRecord> queueListMessagesQueue(QueueStore.QueueDescriptor queue, Long firstQueueKey, Long maxSequence, int max) throws KeyNotFoundException;
 
         public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max);
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java Mon Apr 27 18:40:44 2009
@@ -20,21 +20,28 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.Store.DuplicateKeyException;
 import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage;
+import org.apache.activemq.queue.QueueStore;
 import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.Transaction;
 import org.apache.kahadb.util.LongMarshaller;
 import org.apache.kahadb.util.Marshaller;
 
 public class DestinationEntity {
-    
-    public final static  Marshaller<DestinationEntity> MARSHALLER = new Marshaller<DestinationEntity>() {
-        
+
+    public final static Marshaller<DestinationEntity> MARSHALLER = new Marshaller<DestinationEntity>() {
+
         public Class<DestinationEntity> getType() {
             return DestinationEntity.class;
         }
@@ -43,103 +50,229 @@
             DestinationEntity value = new DestinationEntity();
             value.queueIndex = new BTreeIndex<Long, QueueRecord>(dataIn.readLong());
             value.trackingIndex = new BTreeIndex<Long, Long>(dataIn.readLong());
+            value.descriptor = Marshallers.QUEUE_DESCRIPTOR_MARSHALLER.readPayload(dataIn);
+            value.metaData = new Page<DestinationMetaData>(dataIn.readLong());
             return value;
         }
 
         public void writePayload(DestinationEntity value, DataOutput dataOut) throws IOException {
             dataOut.writeLong(value.queueIndex.getPageId());
             dataOut.writeLong(value.trackingIndex.getPageId());
+            Marshallers.QUEUE_DESCRIPTOR_MARSHALLER.writePayload(value.descriptor, dataOut);
+            dataOut.writeLong(value.metaData.getPageId());
         }
     };
 
-    private long nextQueueKey;
+    public final static Marshaller<DestinationMetaData> META_DATA_MARSHALLER = new Marshaller<DestinationMetaData>() {
+
+        public Class<DestinationMetaData> getType() {
+            return DestinationMetaData.class;
+        }
+
+        public DestinationMetaData readPayload(DataInput dataIn) throws IOException {
+            DestinationMetaData value = new DestinationMetaData();
+            value.count = dataIn.readInt();
+            value.size = dataIn.readLong();
+            return value;
+        }
+
+        public void writePayload(DestinationMetaData value, DataOutput dataOut) throws IOException {
+            dataOut.writeInt(value.count);
+            dataOut.writeLong(value.size);
+        }
+    };
+
+    public Class<DestinationEntity> getType() {
+        return DestinationEntity.class;
+    }
+
     private BTreeIndex<Long, QueueRecord> queueIndex;
     private BTreeIndex<Long, Long> trackingIndex;
-    
-    ///////////////////////////////////////////////////////////////////
+
+    // Descriptor for this queue:
+    private QueueStore.QueueDescriptor descriptor;
+
+    // Child Partitions:
+    private HashSet<DestinationEntity> partitions;
+
+    // Holds volatile queue meta data
+    private Page<DestinationMetaData> metaData;
+
+    // /////////////////////////////////////////////////////////////////
     // Lifecycle Methods.
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
     public void allocate(Transaction tx) throws IOException {
         queueIndex = new BTreeIndex<Long, QueueRecord>(tx.allocate());
         trackingIndex = new BTreeIndex<Long, Long>(tx.allocate());
+        metaData = tx.allocate();
+        metaData.set(new DestinationMetaData());
+        tx.store(metaData, META_DATA_MARSHALLER, true);
     }
-    
+
     public void deallocate(Transaction tx) throws IOException {
         queueIndex.clear(tx);
         trackingIndex.clear(tx);
         tx.free(trackingIndex.getPageId());
         tx.free(queueIndex.getPageId());
-        queueIndex=null;
-        trackingIndex=null;
+        tx.free(metaData.getPageId());
+        queueIndex = null;
+        trackingIndex = null;
+        metaData = null;
     }
-    
+
     public void load(Transaction tx) throws IOException {
-        if( queueIndex.getPageFile()==null ) {
-            
+        if (queueIndex.getPageFile() == null) {
+
             queueIndex.setPageFile(tx.getPageFile());
             queueIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
             queueIndex.setValueMarshaller(Marshallers.QUEUE_RECORD_MARSHALLER);
             queueIndex.load(tx);
-    
-            // Figure out the next key using the last entry in the destination.
-            Entry<Long, QueueRecord> lastEntry = queueIndex.getLast(tx);
-            if( lastEntry!=null ) {
-                nextQueueKey = lastEntry.getKey()+1;
-            }
         }
-        
-        if( trackingIndex.getPageFile()==null ) {
-            
+
+        if (trackingIndex.getPageFile() == null) {
+
             trackingIndex.setPageFile(tx.getPageFile());
             trackingIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
             trackingIndex.setValueMarshaller(LongMarshaller.INSTANCE);
             trackingIndex.load(tx);
         }
+
+        tx.load(metaData, META_DATA_MARSHALLER);
     }
-    
-    ///////////////////////////////////////////////////////////////////
-    // Message Methods.
-    ///////////////////////////////////////////////////////////////////
-    public Long nextQueueKey() {
-        return nextQueueKey++;
+
+    private static final boolean unlimited(Number val) {
+        return val == null || val.longValue() < 0;
     }
+
+    private DestinationMetaData getMetaData(Transaction tx) throws IOException {
+        tx.load(metaData, META_DATA_MARSHALLER);
+        return metaData.get();
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // Message Methods.
+    // /////////////////////////////////////////////////////////////////
     
-    public void add(Transaction tx, QueueAddMessage command) throws IOException {
-        QueueRecord value = new QueueRecord();
-        value.setAttachment(command.getAttachment());
-        value.setMessageKey(command.getMessageKey());
-        value.setQueueKey(command.getQueueKey());
-        queueIndex.put(tx, value.getQueueKey(), value);
-        trackingIndex.put(tx, command.getMessageKey(), command.getQueueKey());
+    public long getSize(Transaction tx) throws IOException {
+        return getMetaData(tx).size;
+    }
+
+    public int getCount(Transaction tx) throws IOException {
+        return getMetaData(tx).count;
+    }
+
+    public long getFirstSequence(Transaction tx) throws IOException {
+        return getMetaData(tx).count == 0 ? 0 : queueIndex.getFirst(tx).getValue().getQueueKey();
+    }
+
+    public long getLastSequence(Transaction tx) throws IOException {
+        return getMetaData(tx).count == 0 ? 0 : queueIndex.getLast(tx).getValue().getQueueKey();
+    }
+
+    public void setQueueDescriptor(QueueStore.QueueDescriptor queue) {
+        descriptor = queue;
+    }
+
+    public QueueStore.QueueDescriptor getDescriptor() {
+        return descriptor;
+    }
+
+    public void addPartition(DestinationEntity destination) {
+        if (partitions == null) {
+            partitions = new HashSet<DestinationEntity>();
+        }
+
+        partitions.add(destination);
+    }
+
+    public void removePartition(DestinationEntity queue) {
+        if (partitions == null) {
+            return;
+        }
+
+        partitions.remove(queue);
+        if (partitions.isEmpty()) {
+            partitions = null;
+        }
+    }
+
+    public Iterator<DestinationEntity> getPartitions() {
+        if (partitions == null) {
+            return null;
+        } else {
+            return partitions.iterator();
+        }
+    }
+
+    public void add(Transaction tx, QueueAddMessage command) throws IOException, DuplicateKeyException {
+
+        Long existing = trackingIndex.put(tx, command.getMessageKey(), command.getQueueKey());
+        if (existing == null) {
+            QueueRecord value = new QueueRecord();
+            value.setAttachment(command.getAttachment());
+            value.setMessageKey(command.getMessageKey());
+            value.setQueueKey(command.getQueueKey());
+            value.setSize(command.getMessageSize());
+
+            QueueRecord rc = queueIndex.put(tx, value.getQueueKey(), value);
+            if (rc == null) {
+                // TODO It seems a little inefficient to continually serialize
+                // the queue size. It might be better to update this only at
+                // commit
+                // timeespecially if we start doing multiple adds per
+                // transaction.
+                // It is also possible that we might want to remove this update
+                // altogether in favor of scanning the whole queue at recovery
+                // time (at the cost of startup time)
+                getMetaData(tx).update(1, command.getMessageSize());
+                tx.store(metaData, META_DATA_MARSHALLER, true);
+            } else {
+                throw new Store.FatalStoreException(new Store.DuplicateKeyException("Duplicate sequence number " + command.getQueueKey() + " for " + descriptor.getQueueName()));
+            }
+        } else {
+            throw new Store.DuplicateKeyException("Duplicate tracking " + command.getMessageKey() + " for " + descriptor.getQueueName());
+        }
     }
 
     public boolean remove(Transaction tx, long msgKey) throws IOException {
         Long queueKey = trackingIndex.remove(tx, msgKey);
-        if(queueKey != null)
-        {
-            queueIndex.remove(tx, queueKey);
+        if (queueKey != null) {
+            QueueRecord qr = queueIndex.remove(tx, queueKey);
+            getMetaData(tx).update(-1, -qr.getSize());
+            tx.store(metaData, META_DATA_MARSHALLER, true);
             return true;
         }
         return false;
     }
 
-    public Iterator<QueueRecord> listMessages(Transaction tx, Long firstQueueKey, final int max) throws IOException {
-        final ArrayList<QueueRecord> rc = new ArrayList<QueueRecord>(max);
+    public Iterator<QueueRecord> listMessages(Transaction tx, Long firstQueueKey, Long maxQueueKey, final int max) throws IOException {
+        Collection<QueueRecord> rc;
+        if (unlimited(max)) {
+            rc = new LinkedList<QueueRecord>();
+        } else {
+            rc = new ArrayList<QueueRecord>(max);
+        }
         
         Iterator<Entry<Long, QueueRecord>> iterator;
-        if( firstQueueKey!=null ) {
-            iterator = queueIndex.iterator(tx, firstQueueKey);
-        } else {
+        if (unlimited(firstQueueKey)) {
             iterator = queueIndex.iterator(tx);
+
+        } else {
+            iterator = queueIndex.iterator(tx, firstQueueKey);
         }
+        boolean sequenceLimited = !unlimited(maxQueueKey);
+        boolean countLimited = !unlimited(max);
         while (iterator.hasNext()) {
-            if( rc.size() >= max ) {
+            if (countLimited && rc.size() >= max) {
                 break;
             }
             Map.Entry<Long, QueueRecord> entry = iterator.next();
+            if (sequenceLimited && entry.getValue().getQueueKey() > maxQueueKey) {
+                break;
+            }
             rc.add(entry.getValue());
         }
-        
+
         return rc.iterator();
     }
 
@@ -147,5 +280,19 @@
         return trackingIndex.iterator(tx);
     }
 
+    public static class DestinationMetaData {
+        int count;
+        long size;
+
+        public void update(int count, long size) {
+            this.count += count;
+            this.size += size;
+        }
+
+        public void set(int count, long size) {
+            this.count = count;
+            this.size = size;
+        }
+    }
 
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Mon Apr 27 18:40:44 2009
@@ -49,6 +49,8 @@
 import org.apache.activemq.protobuf.InvalidProtocolBufferException;
 import org.apache.activemq.protobuf.MessageBuffer;
 import org.apache.activemq.protobuf.PBMessage;
+import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.QueueStore.QueueDescriptor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.journal.Journal;
@@ -102,6 +104,7 @@
 
     protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
     private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
+    private boolean recovering;
 
     private static class UoWOperation {
         public TypeCreatable bean;
@@ -114,7 +117,14 @@
     // /////////////////////////////////////////////////////////////////
     public void start() throws Exception {
         if (started.compareAndSet(false, true)) {
-            load();
+            try
+            {
+                load();
+            }
+            catch (Exception e)
+            {
+                LOG.error("Error loading store", e);
+            }
         }
     }
 
@@ -216,6 +226,7 @@
             };
             checkpointThread.start();
             recover();
+            trackingGen.set(rootEntity.getLastMessageTracking() + 1);
         }
     }
 
@@ -267,6 +278,11 @@
                 rootEntity.setState(CLOSED_STATE);
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
+                        // Set the last update to the next update (otherwise
+                        // we'll replay the last update
+                        // since location marshaller doesn't marshal the
+                        // location's size:
+                        rootEntity.setLastUpdate(journal.getNextLocation(rootEntity.getLastUpdate()));
                         rootEntity.store(tx);
                     }
                 });
@@ -294,7 +310,7 @@
         indexLock.writeLock().lock();
         try {
             long start = System.currentTimeMillis();
-
+            recovering = true;
             ArrayList<UoWOperation> uow = null;
             Location recoveryPosition = getRecoveryPosition();
             if (recoveryPosition != null) {
@@ -352,6 +368,7 @@
                 }
             });
         } finally {
+            recovering = false;
             indexLock.writeLock().unlock();
         }
     }
@@ -359,6 +376,7 @@
     public void incrementalRecover() throws IOException {
         indexLock.writeLock().lock();
         try {
+            recovering = true;
             if (nextRecoveryPosition == null) {
                 if (lastRecoveryPosition == null) {
                     nextRecoveryPosition = getRecoveryPosition();
@@ -381,6 +399,7 @@
                 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
             }
         } finally {
+            recovering = false;
             indexLock.writeLock().unlock();
         }
     }
@@ -492,6 +511,10 @@
         }
 
         rootEntity.setState(OPEN_STATE);
+        // Set the last update to the next update (otherwise we'll replay the
+        // last update
+        // since location marshaller doesn't marshal the location's size:
+        rootEntity.setLastUpdate(journal.getNextLocation(rootEntity.getLastUpdate()));
         rootEntity.store(tx);
         pageFile.flush();
 
@@ -633,6 +656,7 @@
                 }
             });
             rootEntity.setLastUpdate(location);
+
         } finally {
             indexLock.writeLock().unlock();
         }
@@ -736,6 +760,8 @@
 
     @SuppressWarnings("unchecked")
     public void updateIndex(Transaction tx, Type type, MessageBuffer command, Location location) throws IOException {
+        // System.out.println("Updating index" + type.toString() + " loc: " +
+        // location);
         switch (type) {
         case MESSAGE_ADD:
             messageAdd(tx, (MessageAdd) command, location);
@@ -775,23 +801,44 @@
     }
 
     private void queueAdd(Transaction tx, QueueAdd command, Location location) throws IOException {
-        rootEntity.queueAdd(tx, command.getQueueName());
+        QueueStore.QueueDescriptor qd = new QueueStore.QueueDescriptor();
+        qd.setQueueName(command.getQueueName());
+        qd.setApplicationType((short) command.getApplicationType());
+        qd.setQueueType((short) command.getQueueType());
+        if (command.hasParentName()) {
+            qd.setParent(command.getParentName());
+            qd.setPartitionId(command.getPartitionId());
+        }
+
+        rootEntity.queueAdd(tx, qd);
     }
 
     private void queueRemove(Transaction tx, QueueRemove command, Location location) throws IOException {
-        rootEntity.queueRemove(tx, command.getQueueName());
+        QueueStore.QueueDescriptor qd = new QueueStore.QueueDescriptor();
+        qd.setQueueName(command.getQueueName());
+        rootEntity.queueRemove(tx, qd);
     }
 
     private void queueAddMessage(Transaction tx, QueueAddMessage command, Location location) throws IOException {
-        DestinationEntity destination = rootEntity.getDestination(command.getQueueName());
+        QueueStore.QueueDescriptor qd = new QueueStore.QueueDescriptor();
+        qd.setQueueName(command.getQueueName());
+        DestinationEntity destination = rootEntity.getDestination(qd);
         if (destination != null) {
-            destination.add(tx, command);
+            try {
+                destination.add(tx, command);
+            } catch (DuplicateKeyException e) {
+                if (!recovering) {
+                    throw new FatalStoreException(e);
+                }
+            }
             rootEntity.addMessageRef(tx, command.getQueueName(), command.getMessageKey());
         }
     }
 
     private void queueRemoveMessage(Transaction tx, QueueRemoveMessage command, Location location) throws IOException {
-        DestinationEntity destination = rootEntity.getDestination(command.getQueueName());
+        QueueStore.QueueDescriptor qd = new QueueStore.QueueDescriptor();
+        qd.setQueueName(command.getQueueName());
+        DestinationEntity destination = rootEntity.getDestination(qd);
         if (destination != null) {
             if (destination.remove(tx, command.getMessageKey())) {
                 rootEntity.removeMessageRef(tx, command.getQueueName(), command.getMessageKey());
@@ -851,6 +898,7 @@
         // /////////////////////////////////////////////////////////////
         // Message related methods.
         // /////////////////////////////////////////////////////////////
+
         public void messageAdd(MessageRecord message) {
             if (message.getKey() < 0) {
                 throw new IllegalArgumentException("Key not set");
@@ -859,6 +907,7 @@
             bean.setMessageKey(message.getKey());
             bean.setMessageId(message.getMessageId());
             bean.setEncoding(message.getEncoding());
+            bean.setMessageSize(message.getSize());
             Buffer buffer = message.getBuffer();
             if (buffer != null) {
                 bean.setBuffer(buffer);
@@ -881,6 +930,7 @@
                 rc.setKey(bean.getMessageKey());
                 rc.setMessageId(bean.getMessageId());
                 rc.setEncoding(bean.getEncoding());
+                rc.setSize(bean.getMessageSize());
                 if (bean.hasBuffer()) {
                     rc.setBuffer(bean.getBuffer());
                 }
@@ -896,49 +946,65 @@
         // /////////////////////////////////////////////////////////////
         // Queue related methods.
         // /////////////////////////////////////////////////////////////
-        public void queueAdd(AsciiBuffer queueName) {
-            updates.add(new QueueAddBean().setQueueName(queueName));
+        public void queueAdd(QueueStore.QueueDescriptor descriptor) {
+            QueueAddBean update = new QueueAddBean();
+            update.setQueueName(descriptor.getQueueName());
+            update.setQueueType(descriptor.getQueueType());
+            update.setApplicationType(descriptor.getApplicationType());
+            AsciiBuffer parent = descriptor.getParent();
+            if (parent != null) {
+                update.setParentName(parent);
+                update.setPartitionId(descriptor.getPartitionKey());
+            }
+            updates.add(update);
         }
 
-        public void queueRemove(AsciiBuffer queueName) {
-            updates.add(new QueueRemoveBean().setQueueName(queueName));
+        public void queueRemove(QueueStore.QueueDescriptor descriptor) {
+            updates.add(new QueueRemoveBean().setQueueName(descriptor.getQueueName()));
         }
 
-        public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
-            return rootEntity.queueList(tx(), firstQueueName, max);
+        public Iterator<QueueQueryResult> queueListByType(short type, QueueStore.QueueDescriptor firstQueue, int max) {
+            try {
+                return rootEntity.queueList(tx(), type, firstQueue, max);
+            } catch (IOException e) {
+                throw new FatalStoreException(e);
+            }
         }
 
-        public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException {
-            DestinationEntity destination = rootEntity.getDestination(queueName);
-            if (destination == null) {
-                throw new KeyNotFoundException("queue key: " + queueName);
+        public Iterator<QueueQueryResult> queueList(QueueStore.QueueDescriptor firstQueue, int max) {
+            try {
+                return rootEntity.queueList(tx(), (short) -1, firstQueue, max);
+            } catch (IOException e) {
+                throw new FatalStoreException(e);
             }
-            Long queueKey = destination.nextQueueKey();
+        }
+
+        public void queueAddMessage(QueueStore.QueueDescriptor queue, QueueRecord record) throws KeyNotFoundException {
             QueueAddMessageBean bean = new QueueAddMessageBean();
-            bean.setQueueName(queueName);
-            bean.setQueueKey(queueKey);
+            bean.setQueueName(queue.getQueueName());
+            bean.setQueueKey(record.getQueueKey());
             bean.setMessageKey(record.getMessageKey());
+            bean.setMessageSize(record.getSize());
             if (record.getAttachment() != null) {
                 bean.setAttachment(record.getAttachment());
             }
             updates.add(bean);
-            return queueKey;
         }
 
-        public void queueRemoveMessage(AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException {
+        public void queueRemoveMessage(QueueStore.QueueDescriptor queue, Long messageKey) throws KeyNotFoundException {
             QueueRemoveMessageBean bean = new QueueRemoveMessageBean();
             bean.setMessageKey(messageKey);
-            bean.setQueueName(queueName);
+            bean.setQueueName(queue.getQueueName());
             updates.add(bean);
         }
 
-        public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException {
-            DestinationEntity destination = rootEntity.getDestination(queueName);
+        public Iterator<QueueRecord> queueListMessagesQueue(QueueStore.QueueDescriptor queue, Long firstQueueKey, Long maxQueueKey, int max) throws KeyNotFoundException {
+            DestinationEntity destination = rootEntity.getDestination(queue);
             if (destination == null) {
-                throw new KeyNotFoundException("queue key: " + queueName);
+                throw new KeyNotFoundException("queue key: " + queue);
             }
             try {
-                return destination.listMessages(tx(), firstQueueKey, max);
+                return destination.listMessages(tx(), firstQueueKey, maxQueueKey, max);
             } catch (IOException e) {
                 throw new FatalStoreException(e);
             }
@@ -1012,7 +1078,7 @@
             return null;
         }
 
-        public void transactionRemoveMessage(Buffer txid, AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException {
+        public void transactionRemoveMessage(Buffer txid, QueueStore.QueueDescriptor queueName, Long messageKey) throws KeyNotFoundException {
         }
 
         public void transactionRollback(Buffer txid) throws KeyNotFoundException {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java Mon Apr 27 18:40:44 2009
@@ -23,31 +23,45 @@
 import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.QueueStore;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.util.Marshaller;
 
 public class Marshallers {
-    
+
     public final static Marshaller<QueueRecord> QUEUE_RECORD_MARSHALLER = new Marshaller<QueueRecord>() {
-        
+
         public Class<QueueRecord> getType() {
             return QueueRecord.class;
         }
-    
+
         public QueueRecord readPayload(DataInput dataIn) throws IOException {
             QueueRecord rc = new QueueRecord();
             rc.setQueueKey(dataIn.readLong());
             rc.setMessageKey(dataIn.readLong());
-            if( dataIn.readBoolean() ) {
+            rc.setSize(dataIn.readInt());
+            if (dataIn.readBoolean()) {
+                rc.setTte(dataIn.readLong());
+            }
+            rc.setRedelivered(dataIn.readBoolean());
+            if (dataIn.readBoolean()) {
                 rc.setAttachment(BUFFER_MARSHALLER.readPayload(dataIn));
             }
             return rc;
         }
-    
+
         public void writePayload(QueueRecord object, DataOutput dataOut) throws IOException {
             dataOut.writeLong(object.getQueueKey());
             dataOut.writeLong(object.getMessageKey());
-            if( object.getAttachment()!=null ) {
+            dataOut.writeInt(object.getSize());
+            if (object.getTte() >= 0) {
+                dataOut.writeBoolean(true);
+                dataOut.writeLong(object.getTte());
+            } else {
+                dataOut.writeBoolean(false);
+            }
+            dataOut.writeBoolean(object.isRedelivered());
+            if (object.getAttachment() != null) {
                 dataOut.writeBoolean(true);
                 BUFFER_MARSHALLER.writePayload(object.getAttachment(), dataOut);
             } else {
@@ -57,58 +71,89 @@
     };
 
     public final static Marshaller<Location> LOCATION_MARSHALLER = new Marshaller<Location>() {
-    
+
         public Class<Location> getType() {
             return Location.class;
         }
-    
+
         public Location readPayload(DataInput dataIn) throws IOException {
             Location rc = new Location();
             rc.setDataFileId(dataIn.readInt());
             rc.setOffset(dataIn.readInt());
             return rc;
         }
-    
+
         public void writePayload(Location object, DataOutput dataOut) throws IOException {
             dataOut.writeInt(object.getDataFileId());
             dataOut.writeInt(object.getOffset());
         }
     };
-    
-    
+
     public final static Marshaller<AsciiBuffer> ASCII_BUFFER_MARSHALLER = new Marshaller<AsciiBuffer>() {
-    
+
         public Class<AsciiBuffer> getType() {
             return AsciiBuffer.class;
         }
-    
+
         public AsciiBuffer readPayload(DataInput dataIn) throws IOException {
             byte data[] = new byte[dataIn.readShort()];
             dataIn.readFully(data);
             return new AsciiBuffer(data);
         }
-    
+
         public void writePayload(AsciiBuffer object, DataOutput dataOut) throws IOException {
             dataOut.writeShort(object.length);
             dataOut.write(object.data, object.offset, object.length);
         }
     };
-    
+
     public final static Marshaller<Buffer> BUFFER_MARSHALLER = new Marshaller<Buffer>() {
-    
+
         public Class<Buffer> getType() {
             return Buffer.class;
         }
-    
+
         public Buffer readPayload(DataInput dataIn) throws IOException {
             byte data[] = new byte[dataIn.readShort()];
             dataIn.readFully(data);
             return new Buffer(data);
         }
-    
+
         public void writePayload(Buffer object, DataOutput dataOut) throws IOException {
             dataOut.writeShort(object.length);
             dataOut.write(object.data, object.offset, object.length);
         }
     };
+
+    public final static Marshaller<QueueStore.QueueDescriptor> QUEUE_DESCRIPTOR_MARSHALLER = new Marshaller<QueueStore.QueueDescriptor>() {
+
+        public Class<QueueStore.QueueDescriptor> getType() {
+            return QueueStore.QueueDescriptor.class;
+        }
+
+        public QueueStore.QueueDescriptor readPayload(DataInput dataIn) throws IOException {
+            QueueStore.QueueDescriptor descriptor = new QueueStore.QueueDescriptor();
+            descriptor.setQueueType(dataIn.readShort());
+            descriptor.setApplicationType(dataIn.readShort());
+            descriptor.setQueueName(ASCII_BUFFER_MARSHALLER.readPayload(dataIn));
+            if (dataIn.readBoolean()) {
+                descriptor.setParent(ASCII_BUFFER_MARSHALLER.readPayload(dataIn));
+                descriptor.setPartitionId(dataIn.readInt());
+            }
+            return descriptor;
+        }
+
+        public void writePayload(QueueStore.QueueDescriptor object, DataOutput dataOut) throws IOException {
+            dataOut.writeShort(object.getQueueType());
+            dataOut.writeShort(object.getApplicationType());
+            ASCII_BUFFER_MARSHALLER.writePayload(object.getQueueName(), dataOut);
+            if (object.getParent() != null) {
+                dataOut.writeBoolean(true);
+                ASCII_BUFFER_MARSHALLER.writePayload(object.getParent(), dataOut);
+                dataOut.writeInt(object.getPartitionKey());
+            } else {
+                dataOut.writeBoolean(false);
+            }
+        }
+    };
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Mon Apr 27 18:40:44 2009
@@ -19,15 +19,18 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
-import java.util.Set;
+import java.util.LinkedList;
 import java.util.TreeMap;
 import java.util.Map.Entry;
 
 import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.Store.KeyNotFoundException;
+import org.apache.activemq.broker.store.Store.QueueQueryResult;
 import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.QueueStore;
 import org.apache.kahadb.index.BTreeIndex;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Page;
@@ -45,8 +48,9 @@
         public RootEntity readPayload(DataInput is) throws IOException {
             RootEntity rc = new RootEntity();
             rc.state = is.readInt();
+            rc.maxMessageKey = is.readLong();
             rc.messageKeyIndex = new BTreeIndex<Long, Location>(is.readLong());
-            //rc.locationIndex = new BTreeIndex<Location, Long>(is.readLong());
+            // rc.locationIndex = new BTreeIndex<Location, Long>(is.readLong());
             rc.destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(is.readLong());
             rc.messageRefsIndex = new BTreeIndex<Long, Long>(is.readLong());
             if (is.readBoolean()) {
@@ -59,8 +63,9 @@
 
         public void writePayload(RootEntity object, DataOutput os) throws IOException {
             os.writeInt(object.state);
+            os.writeLong(object.maxMessageKey);
             os.writeLong(object.messageKeyIndex.getPageId());
-            //os.writeLong(object.locationIndex.getPageId());
+            // os.writeLong(object.locationIndex.getPageId());
             os.writeLong(object.destinationIndex.getPageId());
             os.writeLong(object.messageRefsIndex.getPageId());
             if (object.lastUpdate != null) {
@@ -79,12 +84,14 @@
     private long pageId;
     private int state;
     private Location lastUpdate;
+    private boolean loaded;
 
     // Message Indexes
-    private long nextMessageKey;
+    private long maxMessageKey;
     private BTreeIndex<Long, Location> messageKeyIndex;
-    //private BTreeIndex<Location, Long> locationIndex;
-    private BTreeIndex<Long, Long> messageRefsIndex; // Maps message key to ref count:
+    // private BTreeIndex<Location, Long> locationIndex;
+    private BTreeIndex<Long, Long> messageRefsIndex; // Maps message key to ref
+    // count:
 
     // The destinations
     private BTreeIndex<AsciiBuffer, DestinationEntity> destinationIndex;
@@ -103,7 +110,8 @@
         state = KahaDBStore.CLOSED_STATE;
 
         messageKeyIndex = new BTreeIndex<Long, Location>(tx.getPageFile(), tx.allocate().getPageId());
-        //locationIndex = new BTreeIndex<Location, Long>(tx.getPageFile(), tx.allocate().getPageId());
+        // locationIndex = new BTreeIndex<Location, Long>(tx.getPageFile(),
+        // tx.allocate().getPageId());
         destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(tx.getPageFile(), tx.allocate().getPageId());
         messageRefsIndex = new BTreeIndex<Long, Long>(tx.getPageFile(), tx.allocate().getPageId());
 
@@ -116,11 +124,17 @@
         messageKeyIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
         messageKeyIndex.setValueMarshaller(Marshallers.LOCATION_MARSHALLER);
         messageKeyIndex.load(tx);
-
-        //locationIndex.setPageFile(tx.getPageFile());
-        //locationIndex.setKeyMarshaller(Marshallers.LOCATION_MARSHALLER);
-        //locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
-        //locationIndex.load(tx);
+        // Update max message key:
+        Entry<Long, Location> last = messageKeyIndex.getLast(tx);
+        if (last != null) {
+            if (last.getKey() > maxMessageKey) {
+                maxMessageKey = last.getKey();
+            }
+        }
+        // locationIndex.setPageFile(tx.getPageFile());
+        // locationIndex.setKeyMarshaller(Marshallers.LOCATION_MARSHALLER);
+        // locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
+        // locationIndex.load(tx);
 
         destinationIndex.setPageFile(tx.getPageFile());
         destinationIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
@@ -137,7 +151,75 @@
         for (Iterator<Entry<AsciiBuffer, DestinationEntity>> iterator = destinationIndex.iterator(tx); iterator.hasNext();) {
             Entry<AsciiBuffer, DestinationEntity> entry = iterator.next();
             entry.getValue().load(tx);
-            destinations.put(entry.getKey(), entry.getValue());
+            try {
+                addToDestinationCache(entry.getValue());
+            } catch (KeyNotFoundException e) {
+                //
+            }
+        }
+
+        // Build up the queue partition hierarchy:
+        try {
+            constructQueueHierarchy();
+        } catch (KeyNotFoundException e) {
+            throw new IOException("Inconsistent store", e);
+        }
+    }
+
+    /**
+     * Adds the destination to the destination cache
+     * 
+     * @param entity
+     *            The destination to cache.
+     * @throws KeyNotFoundException
+     *             If the parent queue could not be found.
+     */
+    private void addToDestinationCache(DestinationEntity entity) throws KeyNotFoundException {
+        QueueStore.QueueDescriptor queue = entity.getDescriptor();
+
+        // If loaded add a reference to us from the parent:
+        if (loaded) {
+            if (queue.getParent() != null) {
+                DestinationEntity parent = destinations.get(queue.getParent());
+                if (parent == null) {
+                    throw new KeyNotFoundException("Parent queue for " + queue.getQueueName() + " not found");
+                }
+                parent.addPartition(entity);
+            }
+        }
+
+        destinations.put(queue.getQueueName(), entity);
+    }
+
+    private void removeFromDestinationCache(DestinationEntity entity) {
+        QueueStore.QueueDescriptor queue = entity.getDescriptor();
+
+        // If the queue is loaded remove the parent reference:
+        if (loaded) {
+            if (queue.getParent() != null) {
+                DestinationEntity parent = destinations.get(queue.getParent());
+                parent.removePartition(entity);
+            }
+        }
+        destinations.remove(queue.getQueueName());
+    }
+
+    /**
+     * Constructs the mapping of parent queues to child queues.
+     * 
+     * @throws KeyNotFoundException
+     */
+    private void constructQueueHierarchy() throws KeyNotFoundException {
+        for (DestinationEntity destination : destinations.values()) {
+            QueueStore.QueueDescriptor queue = destination.getDescriptor();
+            if (queue.getParent() != null) {
+                DestinationEntity parent = destinations.get(queue.getParent());
+                if (parent == null) {
+                    throw new KeyNotFoundException("Parent queue for " + queue.getQueueName() + " not found");
+                } else {
+                    parent.addPartition(destination);
+                }
+            }
         }
     }
 
@@ -150,12 +232,15 @@
     // /////////////////////////////////////////////////////////////////
     // Message Methods.
     // /////////////////////////////////////////////////////////////////
-    public Long nextMessageKey() {
-        return nextMessageKey++;
+    public long getLastMessageTracking() {
+        return maxMessageKey;
     }
 
     public void messageAdd(Transaction tx, MessageAdd command, Location location) throws IOException {
         long id = command.getMessageKey();
+        if (id > maxMessageKey) {
+            maxMessageKey = id;
+        }
         Location previous = messageKeyIndex.put(tx, id, location);
         if (previous != null) {
             // Message existed.. undo the index update we just did. Chances
@@ -163,13 +248,13 @@
             messageKeyIndex.put(tx, id, previous);
         }
     }
-    
+
     public void messageRemove(Transaction tx, Long messageKey) throws IOException {
-        //Location location = messageKeyIndex.remove(tx, messageKey);
+        // Location location = messageKeyIndex.remove(tx, messageKey);
         messageKeyIndex.remove(tx, messageKey);
-        //if (location != null) {
-        //    locationIndex.remove(tx, location);
-        //}
+        // if (location != null) {
+        // locationIndex.remove(tx, location);
+        // }
     }
 
     public Location messageGetLocation(Transaction tx, Long messageKey) {
@@ -200,7 +285,7 @@
             if (refs != null) {
                 if (refs.longValue() <= 1) {
                     messageRefsIndex.remove(tx, messageKey);
-                    //If this is the last record remove, the message
+                    // If this is the last record remove, the message
                     messageRemove(tx, messageKey);
                 } else {
                     messageRefsIndex.put(tx, messageKey, new Long(refs.longValue() - 1));
@@ -214,51 +299,74 @@
     // /////////////////////////////////////////////////////////////////
     // Queue Methods.
     // /////////////////////////////////////////////////////////////////
-    public void queueAdd(Transaction tx, AsciiBuffer queueName) throws IOException {
-        if (destinationIndex.get(tx, queueName) == null) {
+    public void queueAdd(Transaction tx, QueueStore.QueueDescriptor queue) throws IOException {
+        if (destinationIndex.get(tx, queue.getQueueName()) == null) {
             DestinationEntity rc = new DestinationEntity();
+            rc.setQueueDescriptor(queue);
             rc.allocate(tx);
-            destinationIndex.put(tx, queueName, rc);
+            destinationIndex.put(tx, queue.getQueueName(), rc);
             rc.load(tx);
-            destinations.put(queueName, rc);
+            try {
+                addToDestinationCache(rc);
+            } catch (KeyNotFoundException e) {
+                throw new Store.FatalStoreException("Inconsistent QueueStore: " + e.getMessage(), e);
+            }
         }
     }
 
-    public void queueRemove(Transaction tx, AsciiBuffer queueName) throws IOException {
-        DestinationEntity destination = destinations.get(queueName);
+    public void queueRemove(Transaction tx, QueueStore.QueueDescriptor queue) throws IOException {
+        DestinationEntity destination = destinations.get(queue.getQueueName());
         if (destination != null) {
-            //Remove the message references. 
-            //TODO this should probably be optimized. 
+            // Remove the message references.
+            // TODO this should probably be optimized.
             Iterator<Entry<Long, Long>> messages = destination.listTrackingNums(tx);
-            while(messages.hasNext())
-            {
+            while (messages.hasNext()) {
                 Long messageKey = messages.next().getKey();
-                removeMessageRef(tx, queueName, messageKey);
+                removeMessageRef(tx, queue.getQueueName(), messageKey);
             }
-            destinationIndex.remove(tx, queueName);
-            destinations.remove(queueName);
+            destinationIndex.remove(tx, queue.getQueueName());
+            removeFromDestinationCache(destination);
             destination.deallocate(tx);
         }
     }
 
-    public DestinationEntity getDestination(AsciiBuffer queueName) {
-        return destinations.get(queueName);
+    public DestinationEntity getDestination(QueueStore.QueueDescriptor queue) {
+        return destinations.get(queue.getQueueName());
     }
 
-    public Iterator<AsciiBuffer> queueList(Transaction tx, AsciiBuffer firstQueueName, int max) {
-        return list(destinations, firstQueueName, max);
-    }
+    public Iterator<QueueQueryResult> queueList(Transaction tx, short type, QueueStore.QueueDescriptor firstQueue, int max) throws IOException {
+        LinkedList<QueueQueryResult> results = new LinkedList<QueueQueryResult>();
+        Collection<DestinationEntity> values = (firstQueue == null ? destinations.values() : destinations.tailMap(firstQueue.getQueueName()).values());
 
-    static private <Key, Value> Iterator<Key> list(TreeMap<Key, Value> map, Key first, int max) {
-        ArrayList<Key> rc = new ArrayList<Key>(max);
-        Set<Key> keys = (first == null ? map : map.tailMap(first)).keySet();
-        for (Key buffer : keys) {
-            if (rc.size() >= max) {
+        for (DestinationEntity de : values) {
+            if (results.size() >= max) {
                 break;
             }
-            rc.add(buffer);
+
+            if (type == -1 || de.getDescriptor().getApplicationType() == type) {
+                results.add(queryQueue(tx, de));
+            }
         }
-        return rc.iterator();
+        return results.iterator();
+    }
+
+    private final QueueQueryResult queryQueue(Transaction tx, DestinationEntity de) throws IOException {
+
+        QueueQueryResultImpl result = new QueueQueryResultImpl();
+        result.count = de.getCount(tx);
+        result.size = de.getSize(tx);
+        result.firstSequence = de.getFirstSequence(tx);
+        result.lastSequence = de.getLastSequence(tx);
+        result.desc = de.getDescriptor().copy();
+        Iterator<DestinationEntity> partitions = de.getPartitions();
+        if (partitions != null && partitions.hasNext()) {
+            result.partitions = new LinkedList<QueueQueryResult>();
+            while (partitions.hasNext()) {
+                result.partitions.add(queryQueue(tx, destinations.get(partitions.next().getDescriptor().getQueueName())));
+            }
+        }
+
+        return result;
     }
 
     public long getPageId() {
@@ -285,4 +393,37 @@
         this.lastUpdate = lastUpdate;
     }
 
+    private static class QueueQueryResultImpl implements QueueQueryResult {
+
+        QueueStore.QueueDescriptor desc;
+        Collection<QueueQueryResult> partitions;
+        long size;
+        int count;
+        long firstSequence;
+        long lastSequence;
+
+        public QueueStore.QueueDescriptor getDescriptor() {
+            return desc;
+        }
+
+        public Collection<QueueQueryResult> getPartitions() {
+            return partitions;
+        }
+
+        public long getSize() {
+            return size;
+        }
+
+        public int getCount() {
+            return count;
+        }
+
+        public long getFirstSequence() {
+            return firstSequence;
+        }
+
+        public long getLastSequence() {
+            return lastSequence;
+        }
+    }
 }
\ No newline at end of file



Mime
View raw message