activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r758329 - in /activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq: broker/ broker/openwire/ broker/stomp/ broker/store/ queue/
Date Wed, 25 Mar 2009 16:20:56 GMT
Author: chirino
Date: Wed Mar 25 16:20:40 2009
New Revision: 758329

URL: http://svn.apache.org/viewvc?rev=758329&view=rev
Log:
Applying Colin's patch for https://issues.apache.org/activemq/browse/AMQ-2177
Thanks!


Added:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistentQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java?rev=758329&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java Wed Mar 25 16:20:40 2009
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.util.Collection;
+import java.util.HashSet;
+
+import org.apache.activemq.broker.store.BrokerDatabase;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.queue.PersistentQueue;
+
+public abstract class BrokerMessageDelivery implements MessageDelivery {
+
+    HashSet<PersistentQueue<MessageDelivery>> persistentTargets;
+    // Indicates whether or not the message has already been saved
+    // if it hasn't in memory updates can be done.
+    boolean saved = false;
+    long storeTracking = -1;
+    BrokerDatabase store;
+
+    public final boolean isFromStore() {
+        return false;
+    }
+
+    public final void persist(PersistentQueue<MessageDelivery> queue) {
+
+        synchronized (this) {
+            if (!saved) {
+                if (persistentTargets == null) {
+                    persistentTargets = new HashSet<PersistentQueue<MessageDelivery>>();
+                }
+                persistentTargets.add(queue);
+                return;
+            }
+        }
+        
+        //TODO probably need to pass in the saving queue's source controller here
+        //and treat it like it is dispatching to the saver queue. 
+        store.saveMessage(this, queue, null);
+    }
+
+    public final void delete(PersistentQueue<MessageDelivery> queue) {
+        synchronized (this) {
+            if (!saved) {
+                persistentTargets.remove(queue);
+                return;
+            }
+        }
+
+        store.deleteMessage(this, queue);
+    }
+
+    public synchronized void beginStore(long storeTracking) {
+        saved = true;
+        this.storeTracking = storeTracking;
+    }
+
+    public long getStoreTracking() {
+        return storeTracking;
+    }
+
+    public Collection<PersistentQueue<MessageDelivery>> getPersistentQueues() {
+        return persistentTargets;
+    }
+
+    public void persistIfNeeded(ISourceController<?> controller) {
+        boolean saveNeeded = false;
+        synchronized (this) {
+            if (persistentTargets.isEmpty()) {
+                saveNeeded = false;
+                saved = true;
+            }
+        }
+
+        if (saveNeeded) {
+            store.persistReceivedMessage(this, controller);
+        }
+    }
+}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java?rev=758329&r1=758328&r2=758329&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java Wed Mar 25 16:20:40 2009
@@ -18,6 +18,7 @@
 
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
 
 public interface DeliveryTarget {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=758329&r1=758328&r2=758329&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Wed Mar 25 16:20:40 2009
@@ -19,6 +19,7 @@
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.PersistentQueue;
 
 public interface MessageDelivery {
 
@@ -37,17 +38,34 @@
     public boolean isPersistent();
 
     /**
+     * @return True if this message was read from the store.
+     */
+    public boolean isFromStore();
+
+    /**
      * Returns true if this message requires acknowledgement.
      */
     public boolean isResponseRequired();
-    
+
     /**
-     * Called when the message's persistence requirements have
-     * been met. This method must not block. 
+     * Called when the message's persistence requirements have been met. This
+     * method must not block.
      */
     public void onMessagePersisted();
-    
+
     public Store.Session.MessageRecord createMessageRecord();
 
     public Buffer getTransactionId();
+
+    public void persist(PersistentQueue<MessageDelivery> queue);
+
+    public void delete(PersistentQueue<MessageDelivery> queue);
+    
+    /**
+     * Gets the tracking number used to identify this message in the message
+     * store.
+     * 
+     * @return The store tracking or -1 if not set.
+     */
+    public long getStoreTracking();
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java?rev=758329&r1=758328&r2=758329&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java Wed Mar 25 16:20:40 2009
@@ -63,32 +63,35 @@
         domain.bind(destination.getName(), dt);
     }
 
-    public void route(final MessageDelivery msg, ISourceController<?> controller) {
+    public void route(final BrokerMessageDelivery msg, ISourceController<?> controller) {
 
-//        final Buffer transactionId = msg.getTransactionId();
-//        if( msg.isPersistent() ) {
-//            VoidCallback<RuntimeException> tx = new VoidCallback<RuntimeException>() {
-//                @Override
-//                public void run(Session session) throws RuntimeException {
-//                    Long messageKey = session.messageAdd(msg.createMessageRecord());
-//                    if( transactionId!=null ) {
-//                        session.transactionAddMessage(transactionId, messageKey);
-//                    }
-//                }
-//            };
-//            Runnable onFlush = new Runnable() {
-//                public void run() {
-//                    if( msg.isResponseRequired() ) {
-//                        // Let the client know the broker got the message.
-//                        msg.onMessagePersisted();
-//                    }
-//                }
-//            };
-//            virtualHost.getStore().execute(tx, onFlush);
-//        }
-//        
+        // final Buffer transactionId = msg.getTransactionId();
+        // if( msg.isPersistent() ) {
+        // VoidCallback<RuntimeException> tx = new
+        // VoidCallback<RuntimeException>() {
+        // @Override
+        // public void run(Session session) throws RuntimeException {
+        // Long messageKey = session.messageAdd(msg.createMessageRecord());
+        // if( transactionId!=null ) {
+        // session.transactionAddMessage(transactionId, messageKey);
+        // }
+        // }
+        // };
+        // Runnable onFlush = new Runnable() {
+        // public void run() {
+        // if( msg.isResponseRequired() ) {
+        // // Let the client know the broker got the message.
+        // msg.onMessagePersisted();
+        // }
+        // }
+        // };
+        // virtualHost.getStore().execute(tx, onFlush);
+        // }
+        //        
         Collection<DeliveryTarget> targets = route(msg.getDestination(), msg);
 
+        msg.store = getVirtualHost().getDatabase();
+        
         // TODO:
         // Consider doing some caching of this target list. Most producers
         // always send to the same destination.
@@ -97,19 +100,23 @@
             if (msg.isResponseRequired()) {
                 // We need to ack the message once we ensure we won't loose it.
                 // We know we won't loose it once it's persisted or delivered to
-                // a consumer Setup a callback to get notifed once one of those happens.
+                // a consumer Setup a callback to get notifed once one of those
+                // happens.
                 if (!msg.isPersistent()) {
                     // Let the client know the broker got the message.
                     msg.onMessagePersisted();
                 }
             }
 
-            // Deliver the message to all the targets..
+            //The sinks will request persistence via MessageDelivery.persist()
+            //if they require persistence:
             for (DeliveryTarget dt : targets) {
                 if (dt.match(msg)) {
                     dt.getSink().add(msg, controller);
                 }
             }
+            
+            msg.persistIfNeeded(controller);
 
         } else {
             // Let the client know we got the message even though there

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java?rev=758329&r1=758328&r2=758329&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java Wed Mar 25 16:20:40 2009
@@ -55,8 +55,8 @@
         return router;
     }
     public void setRouter(Router router) {
-        this.router.setVirtualHost(this);
         this.router = router;
+        this.router.setVirtualHost(this);
     }
 
     public void start() throws Exception {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=758329&r1=758328&r2=758329&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Wed Mar 25 16:20:40 2009
@@ -16,14 +16,14 @@
  */
 package org.apache.activemq.broker.openwire;
 
+import org.apache.activemq.broker.BrokerMessageDelivery;
 import org.apache.activemq.broker.Destination;
-import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.store.Store.Session.MessageRecord;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 
-public class OpenWireMessageDelivery implements MessageDelivery {
+public class OpenWireMessageDelivery extends BrokerMessageDelivery {
 
     static final private AsciiBuffer ENCODING = new AsciiBuffer("openwire");
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=758329&r1=758328&r2=758329&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Wed Mar 25 16:20:40 2009
@@ -16,15 +16,15 @@
  */
 package org.apache.activemq.broker.stomp;
 
+import org.apache.activemq.broker.BrokerMessageDelivery;
 import org.apache.activemq.broker.Destination;
-import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.store.Store.Session.MessageRecord;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.transport.stomp.Stomp;
 import org.apache.activemq.transport.stomp.StompFrame;
 
-public class StompMessageDelivery implements MessageDelivery {
+public class StompMessageDelivery extends BrokerMessageDelivery {
 
     static final private AsciiBuffer ENCODING = new AsciiBuffer("stomp");
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=758329&r1=758328&r2=758329&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Wed Mar 25 16:20:40 2009
@@ -19,21 +19,27 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.Semaphore;
 
-import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.BrokerMessageDelivery;
 import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.protocol.ProtocolHandler;
+import org.apache.activemq.broker.protocol.ProtocolHandlerFactory;
 import org.apache.activemq.broker.store.Store.Callback;
 import org.apache.activemq.broker.store.Store.Session;
 import org.apache.activemq.broker.store.Store.Session.MessageRecord;
 import org.apache.activemq.broker.store.Store.Session.KeyNotFoundException;
+import org.apache.activemq.broker.store.Store.Session.QueueRecord;
 import org.apache.activemq.broker.store.memory.MemoryStore;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.queue.ExclusiveQueue;
 import org.apache.activemq.queue.IPollableFlowSource;
+import org.apache.activemq.queue.PersistentQueue;
 import org.apache.activemq.queue.IPollableFlowSource.FlowReadyListener;
 
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
@@ -51,6 +57,8 @@
     private final FlowReadyListener<Operation> enqueueListener;
     private DatabaseListener listener;
 
+    private HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
+
     public interface DatabaseListener {
         /**
          * Called if there is a catastrophic problem with the database.
@@ -61,6 +69,21 @@
         public void onDatabaseException(IOException ioe);
     }
 
+    /**
+     * Holder of a restored message to be passed to a
+     * {@link MessageRestoreListener}. This allows the demarshalling to be done
+     * by the listener instead of the the database worker.
+     * 
+     * @author cmacnaug
+     */
+    public interface RestoredMessage {
+        MessageDelivery getMessageDelivery();
+    }
+
+    public interface MessageRestoreListener {
+        public void messagesRestored(Collection<RestoredMessage> msgs);
+    }
+
     public BrokerDatabase() {
         storeLimiter = new SizeLimiter<Operation>(1024 * 512, 0) {
             public int getElementSize(Operation op) {
@@ -114,8 +137,47 @@
         }
     }
 
-    public void persistReceivedMessage(MessageDelivery delivery, Collection<DeliveryTarget> targets, ISourceController<?> source) {
-        add(new AddMessageOperation(delivery, targets), source, true);
+    /**
+     * Saves a message for all of the recipients in the
+     * {@link BrokerMessageDelivery}.
+     * 
+     * @param delivery
+     *            The delivery.
+     * @param source
+     *            The source's controller.
+     */
+    public void persistReceivedMessage(BrokerMessageDelivery delivery, ISourceController<?> source) {
+        add(new AddMessageOperation(delivery), source, true);
+    }
+
+    /**
+     * Saves a Message for a single queue.
+     * 
+     * @param delivery
+     *            The delivery
+     * @param queue
+     *            The queue
+     * @param source
+     *            The source initiating the save or null, if there isn't one.
+     */
+    public void saveMessage(MessageDelivery delivery, PersistentQueue<MessageDelivery> queue, ISourceController<?> source) {
+        add(new AddMessageOperation(delivery, queue), source, false);
+    }
+
+    /**
+     * Deletes the given message from the store for the given queue.
+     * 
+     * @param delivery
+     *            The delivery.
+     * @param queue
+     *            The queue.
+     */
+    public void deleteMessage(MessageDelivery delivery, PersistentQueue<MessageDelivery> queue) {
+        opQueue.add(new DeleteMessageOperation(delivery, queue), null);
+    }
+
+    public void restoreMessages(PersistentQueue<MessageDelivery> queue, long first, int max, MessageRestoreListener listener) {
+        opQueue.add(new RestoreMessageOperation(queue, first, max, listener), null);
     }
 
     /**
@@ -156,25 +218,27 @@
                     continue;
                 }
             }
-            
+
             // The first operation we get, triggers a store transaction.
             if (firstOp != null) {
                 final ArrayList<Operation> processedQueue = new ArrayList<Operation>();
                 try {
-                    store.execute(new Store.VoidCallback<Exception>(){
+                    store.execute(new Store.VoidCallback<Exception>() {
                         @Override
                         public void run(Session session) throws Exception {
-                            
-                            // Try to execute the operation against the session...
+
+                            // Try to execute the operation against the
+                            // session...
                             try {
                                 firstOp.execute(session);
                                 processedQueue.add(firstOp);
                             } catch (CancellationException ignore) {
                             }
 
-                            // See if we can batch up some additional operations in 
-                            // this transaction.  
-                            
+                            // See if we can batch up some additional operations
+                            // in
+                            // this transaction.
+
                             Operation op;
                             synchronized (opQueue) {
                                 op = opQueue.poll();
@@ -205,7 +269,7 @@
                     for (Operation processed : processedQueue) {
                         processed.onRollback(e);
                     }
-                    
+
                 }
             }
         }
@@ -321,50 +385,183 @@
         public void onCommit() {
         }
 
-        public void onRollback() {
+        /**
+         * Called after {@link #execute(Session)} is called and the the
+         * operation has been rolled back.
+         */
+        public void onRollback(Throwable error) {
+
+        }
+    }
+
+    private class DeleteMessageOperation extends OperationBase {
+        private final MessageDelivery delivery;
+        private PersistentQueue<MessageDelivery> queue;
+
+        public DeleteMessageOperation(MessageDelivery delivery, PersistentQueue<MessageDelivery> queue) {
+            this.delivery = delivery;
+            this.queue = queue;
+        }
+
+        @Override
+        public int getLimiterSize() {
+            // Might consider bumping this up to avoid too much accumulation?
+            return 0;
+        }
+
+        @Override
+        protected void doExcecute(Session session) {
+            try {
+                session.queueRemoveMessage(queue.getPeristentQueueName(), delivery.getStoreTracking());
+            } catch (KeyNotFoundException e) {
+                // TODO Probably doesn't always mean an error, it is possible
+                // that
+                // the queue has been deleted, in which case its messages will
+                // have been deleted, too.
+                e.printStackTrace();
+            }
+        }
+
+        @Override
+        public void onRollback(Throwable error) {
+        }
+
+        @Override
+        public void onCommit() {
+            delivery.onMessagePersisted();
+        }
+    }
+
+    private class RestoreMessageOperation extends OperationBase {
+        private PersistentQueue<MessageDelivery> queue;
+        private long firstKey;
+        private int maxRecords;
+        private MessageRestoreListener listener;
+        private Collection<RestoredMessage> msgs = null;
+
+        RestoreMessageOperation(PersistentQueue<MessageDelivery> queue, long firstKey, int maxRecords, MessageRestoreListener listener) {
+            this.queue = queue;
+            this.firstKey = firstKey;
+            this.maxRecords = maxRecords;
+            this.listener = listener;
+        }
+
+        @Override
+        protected void doExcecute(Session session) {
+
+            Iterator<QueueRecord> records = null;
+            try {
+                records = session.queueListMessagesQueue(queue.getPeristentQueueName(), firstKey, maxRecords);
+
+            } catch (KeyNotFoundException e) {
+                msgs = new ArrayList<RestoredMessage>(0);
+                return;
+            }
+
+            while (records.hasNext()) {
+                RestoredMessageImpl rm = new RestoredMessageImpl();
+                // TODO should update jms redelivery here.
+                rm.qRecord = records.next();
+                rm.mRecord = session.messageGetRecord(rm.qRecord.messageKey);
+                rm.handler = protocolHandlers.get(rm.mRecord.encoding.toString());
+                if (rm.handler == null) {
+                    try {
+                        rm.handler = ProtocolHandlerFactory.createProtocolHandler(rm.mRecord.encoding.toString());
+                        protocolHandlers.put(rm.mRecord.encoding.toString(), rm.handler);
+                    } catch (Throwable thrown) {
+                        throw new RuntimeException("Unknown message format" + rm.mRecord.encoding.toString(), thrown);
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void onRollback(Throwable error) {
+        }
+
+        @Override
+        public void onCommit() {
+            listener.messagesRestored(msgs);
         }
     }
 
     private class AddMessageOperation extends OperationBase {
+
+        private final BrokerMessageDelivery brokerDelivery;
+
         private final MessageDelivery delivery;
-        private final Collection<DeliveryTarget> targets;
-        
-        public AddMessageOperation(MessageDelivery delivery, Collection<DeliveryTarget> targets) {
+        private final PersistentQueue<MessageDelivery> target;
+
+        public AddMessageOperation(BrokerMessageDelivery delivery) {
+            this.brokerDelivery = delivery;
+            this.delivery = delivery;
+            target = null;
+        }
+
+        public AddMessageOperation(MessageDelivery delivery, PersistentQueue<MessageDelivery> target) {
+            this.brokerDelivery = null;
             this.delivery = delivery;
-            this.targets = targets;
-            
+            this.target = target;
         }
 
+        @Override
         public int getLimiterSize() {
             return delivery.getFlowLimiterSize();
         }
 
         @Override
         protected void doExcecute(Session session) {
-            // TODO need to get at protocol buffer.
-            
-            MessageRecord record = delivery.createMessageRecord();
-            Long key = session.messageAdd(record);
-            for(DeliveryTarget target : targets)
-            {
+
+            if (target == null) {
+                MessageRecord record = delivery.createMessageRecord();
+                Long key = session.messageAdd(record);
+                brokerDelivery.beginStore(key);
+
+                for (PersistentQueue<MessageDelivery> target : brokerDelivery.getPersistentQueues()) {
+                    try {
+                        Session.QueueRecord queueRecord = new Session.QueueRecord();
+                        queueRecord.setAttachment(null);
+                        queueRecord.setMessageKey(key);
+                        session.queueAddMessage(target.getPeristentQueueName(), queueRecord);
+
+                    } catch (KeyNotFoundException e) {
+                        e.printStackTrace();
+                    }
+                }
+            } else {
+
+                MessageRecord record = delivery.createMessageRecord();
+                Long key = session.messageAdd(record);
                 try {
                     Session.QueueRecord queueRecord = new Session.QueueRecord();
                     queueRecord.setAttachment(null);
                     queueRecord.setMessageKey(key);
-                    session.queueAddMessage(target.getPersistentQueueName(), queueRecord);
+                    session.queueAddMessage(target.getPeristentQueueName(), queueRecord);
                 } catch (KeyNotFoundException e) {
                     e.printStackTrace();
                 }
             }
         }
 
+        @Override
         public void onRollback(Throwable error) {
-            // TODO Auto-generated method stub
+            error.printStackTrace();
         }
 
+        @Override
         public void onCommit() {
             delivery.onMessagePersisted();
         }
 
     }
+
+    private class RestoredMessageImpl implements RestoredMessage {
+        QueueRecord qRecord;
+        MessageRecord mRecord;
+        ProtocolHandler handler;
+
+        public MessageDelivery getMessageDelivery() {
+            return handler.createMessageDelivery(mRecord);
+        }
+    }
 }

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java?rev=758329&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java Wed Mar 25 16:20:40 2009
@@ -0,0 +1,113 @@
+package org.apache.activemq.broker.store;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.activemq.queue.PersistentQueue;
+import org.apache.activemq.queue.QueueStoreHelper;
+import org.apache.activemq.queue.SingleFlowRelay;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.store.BrokerDatabase.RestoredMessage;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.IFlowController;
+import org.apache.activemq.flow.IFlowRelay;
+import org.apache.activemq.flow.ISinkController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.flow.ISinkController.FlowUnblockListener;
+
+public class MessageDeliveryStoreHelper implements QueueStoreHelper<MessageDelivery>, Dispatchable, BrokerDatabase.MessageRestoreListener {
+
+    private final BrokerDatabase database;
+    private final PersistentQueue<MessageDelivery> queue;
+    private final DispatchContext dispatchContext;
+    private final ConcurrentLinkedQueue<RestoredMessage> restoredMsgs = new ConcurrentLinkedQueue<RestoredMessage>();
+    private final IFlowRelay<MessageDelivery> restoreRelay;
+    private final SizeLimiter<MessageDelivery> restoreLimiter;
+    private final IFlowController<MessageDelivery> controller;
+    private final FlowUnblockListener<MessageDelivery> unblockListener;
+
+    private int RESTORE_BATCH_SIZE = 50;
+    
+    private boolean restoreComplete;
+
+    private static enum State {
+        STOPPED, RESTORING, RESTORED
+    };
+
+    private State state = State.RESTORING;
+
+    MessageDeliveryStoreHelper(BrokerDatabase database, PersistentQueue<MessageDelivery> queue, IDispatcher dispatcher) {
+        this.database = database;
+        this.queue = queue;
+        Flow flow = new Flow("MessageRestorer-" + queue.getPeristentQueueName(), false);
+        restoreLimiter = new SizeLimiter<MessageDelivery>(1000, 500) {
+            @Override
+            public int getElementSize(MessageDelivery msg) {
+                return msg.getFlowLimiterSize();
+            }
+        };
+        restoreRelay = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), restoreLimiter);
+        controller = restoreRelay.getFlowController(flow);
+        dispatchContext = dispatcher.register(this, flow.getFlowName());
+
+        unblockListener = new FlowUnblockListener<MessageDelivery>() {
+            public void onFlowUnblocked(ISinkController<MessageDelivery> controller) {
+                dispatchContext.requestDispatch();
+            }
+        };
+    }
+
+    public void delete(MessageDelivery elem, boolean flush) {
+        elem.delete(queue);
+    }
+
+    public void save(MessageDelivery elem, boolean flush) {
+        elem.persist(queue);
+    }
+
+    public boolean hasStoredElements() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    public void startLoadingQueue() {
+        // TODO Auto-generated method stub
+    }
+
+    public void stopLoadingQueue() {
+        // TODO Auto-generated method stub
+    }
+
+    public boolean dispatch() {
+
+        RestoredMessage restored = restoredMsgs.poll();
+        if (restored == null || restoreComplete) {
+            return true;
+        }
+
+        if (controller.isSinkBlocked()) {
+            if (controller.addUnblockListener(unblockListener)) {
+                return true;
+            }
+        } else {
+            queue.addFromStore(restored.getMessageDelivery(), controller);
+        }
+
+        return false;
+    }
+
+    public void messagesRestored(Collection<RestoredMessage> msgs) {
+        synchronized (restoredMsgs) {
+            if (!msgs.isEmpty()) {
+                restoredMsgs.addAll(msgs);
+            } else {
+
+            }
+        }
+
+        dispatchContext.requestDispatch();
+    }
+}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=758329&r1=758328&r2=758329&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Wed Mar 25 16:20:40 2009
@@ -25,7 +25,10 @@
 import org.apache.activemq.flow.AbstractLimitedFlowSource;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISinkController;
+import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.protobuf.AsciiBuffer;
 
 /**
  * Base class for a {@link Dispatchable} {@link FlowControllable}
@@ -33,7 +36,7 @@
  * 
  * @param <E>
  */
-public abstract class AbstractFlowQueue<E> extends AbstractLimitedFlowSource<E> implements FlowControllable<E>, IFlowQueue<E>, Dispatchable {
+public abstract class AbstractFlowQueue<E> extends AbstractLimitedFlowSource<E> implements PersistentQueue<E>, FlowControllable<E>, IFlowQueue<E>, Dispatchable {
 
     protected IDispatcher dispatcher;
     protected DispatchContext dispatchContext;
@@ -41,6 +44,8 @@
     private boolean notifyReady = false;
     protected boolean dispatching = false;
     protected int dispatchPriority = 0;
+    protected QueueStoreHelper<E> storeHelper;
+    AsciiBuffer persistentQueueName;
 
     AbstractFlowQueue() {
         super();
@@ -50,6 +55,30 @@
         super(name);
     }
 
+    public final void add(E elem, ISourceController<?> source) {
+        checkSave(elem, source);
+        getSinkController(elem, source).add(elem, source);
+    }
+
+    public final boolean offer(E elem, ISourceController<?> source) {
+        if(getSinkController(elem, source).offer(elem, source))
+        {
+            checkSave(elem, source);
+            return true;
+        }
+        return false;
+    }
+    
+    private void checkSave(E elem, ISourceController<?> source) 
+    {
+        if(storeHelper != null && isElementPersistent(elem))
+        {
+            storeHelper.save(elem, true);
+        }
+    }
+
+    protected abstract ISinkController<E> getSinkController(E elem, ISourceController<?> source);
+
     public final boolean dispatch() {
 
         // while (pollingDispatch());
@@ -147,7 +176,60 @@
         notifyReady = false;
     }
 
+    /**
+     * Enables persistence for this queue.
+     */
+    public void enablePersistence(QueueStoreHelper<E> storeHelper) {
+        this.storeHelper = storeHelper;
+    }
+
+    /**
+     * Called when an element is added from the queue's store.
+     * 
+     * @param elem
+     *            The element
+     * @param controller
+     *            The store controller.
+     */
+    public void addFromStore(E elem, ISourceController<?> controller) {
+        add(elem, controller);
+    }
+
+    /**
+     * Called when there are no more elements to be loaded from the store.
+     */
+    public void onQueueLoadComplete() {
+
+    }
+
+    /**
+     * Subclasses should override this if they require persistence 
+     * requires saving to the store.
+     * 
+     * @param elem
+     *            The element to check.
+     */
+    public boolean isElementPersistent(E elem) {
+        return false;
+    }
+
     public String toString() {
         return getResourceName();
     }
+
+    /**
+     * Returns the queue name used to indentify the queue in the store
+     * 
+     * @return
+     */
+    public AsciiBuffer getPeristentQueueName() {
+        if (persistentQueueName == null) {
+            String name = getResourceName();
+            if (name != null) {
+                persistentQueueName = new AsciiBuffer(name);
+            }
+        }
+        return persistentQueueName;
+    }
+
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java?rev=758329&r1=758328&r2=758329&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java Wed Mar 25 16:20:40 2009
@@ -19,6 +19,7 @@
 import org.apache.activemq.dispatch.PriorityLinkedList;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.PriorityFlowController;
 import org.apache.activemq.flow.PrioritySizeLimiter;
@@ -57,15 +58,8 @@
 
     }
 
-    public boolean offer(E elem, ISourceController<?> source) {
-        return controller.offer(elem, source);
-    }
-
-    /**
-     * Performs a limited add to the queue.
-     */
-    public final void add(E elem, ISourceController<?> source) {
-        controller.add(elem, source);
+    protected final ISinkController<E> getSinkController(E elem, ISourceController<?> source) {
+        return controller;
     }
 
     /**

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java?rev=758329&r1=758328&r2=758329&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java Wed Mar 25 16:20:40 2009
@@ -21,6 +21,7 @@
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;
 
 public class ExclusiveQueue<E> extends AbstractFlowQueue<E> {
@@ -41,15 +42,9 @@
         super.onFlowOpened(controller);
     }
 
-    public boolean offer(E elem, ISourceController<?> source) {
-        return controller.offer(elem, source);
-    }
 
-    /**
-     * Performs a limited add to the queue.
-     */
-    public final void add(E elem, ISourceController<?> source) {
-        controller.add(elem, source);
+    protected final ISinkController<E> getSinkController(E elem, ISourceController<?> source) {
+        return controller;
     }
 
     /**

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java?rev=758329&r1=758328&r2=758329&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java Wed Mar 25 16:20:40 2009
@@ -108,17 +108,10 @@
         super.onFlowOpened(sinkController);
     }
 
-    public boolean offer(E elem, ISourceController<?> source) {
-        return sinkController.offer(elem, source);
-    }
-
-    /**
-     * Performs a limited add to the queue.
-     */
-    public final void add(E elem, ISourceController<?> source) {
-        sinkController.add(elem, source);
+    protected final ISinkController<E> getSinkController(E elem, ISourceController<?> source) {
+        return sinkController;
     }
-
+    
     /**
      * Called when the controller accepts a message for this queue.
      */

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java?rev=758329&r1=758328&r2=758329&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java Wed Mar 25 16:20:40 2009
@@ -24,6 +24,7 @@
 import org.apache.activemq.flow.IFlowLimiter;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.kahadb.util.LinkedNode;
@@ -48,20 +49,16 @@
         throw new UnsupportedOperationException();
     }
 
-    public boolean offer(E elem, ISourceController<?> source) {
-        throw new UnsupportedOperationException("Not yet implemented");
-    }
-
-    public synchronized void add(E elem, ISourceController<?> source) {
+    protected synchronized final ISinkController<E> getSinkController(E elem, ISourceController<?> source) {
         SingleFlowQueue queue = flowQueues.get(source.getFlow());
         if (queue == null) {
             queue = new SingleFlowQueue(source.getFlow(), new SizeLimiter<E>(perFlowWindow, resumeThreshold));
             flowQueues.put(source.getFlow(), queue);
             super.onFlowOpened(queue.controller);
         }
-        queue.enqueue(elem, source);
+        return queue.controller;
     }
-
+    
     public boolean pollingDispatch() {
         SingleFlowQueue queue = null;
         E elem = null;
@@ -88,23 +85,21 @@
 
     public final E poll() {
         synchronized (this) {
-            synchronized (this) {
-                SingleFlowQueue queue = peekReadyQueue();
-                if (queue == null) {
-                    return null;
-                }
-
-                E elem = queue.poll();
-                if (elem == null) {
+            SingleFlowQueue queue = peekReadyQueue();
+            if (queue == null) {
+                return null;
+            }
 
-                    unreadyQueue(queue);
-                    return null;
-                }
+            E elem = queue.poll();
+            if (elem == null) {
 
-                // rotate to have fair dispatch.
-                queue.getList().rotate();
-                return elem;
+                unreadyQueue(queue);
+                return null;
             }
+
+            // rotate to have fair dispatch.
+            queue.getList().rotate();
+            return elem;
         }
     }
 

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistentQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistentQueue.java?rev=758329&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistentQueue.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistentQueue.java Wed Mar 25 16:20:40 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public interface PersistentQueue<E> {
+
+    /**
+     * When the memory size of the queue exceeds this limit, elements are
+     * spooled to disk.
+     * 
+     * @param extent
+     *            The save extent.
+     * 
+     *            public PersistentQueue<E> setSaveExent(long extent);
+     */
+
+    /**
+     * Gets the save extent associated with the queue.
+     * 
+     * @return the save extent
+     * 
+     *         public long getSaveExent(long extent);
+     */
+
+    /**
+     * Enables persistence for this queue.
+     */
+    public void enablePersistence(QueueStoreHelper<E> storeHelper);
+
+    /**
+     * Called when an element is added from the queue's store.
+     * 
+     * @param elem
+     *            The element
+     * @param controller
+     *            The store controller.
+     */
+    public void addFromStore(E elem, ISourceController<?> controller);
+
+    /**
+     * Called when there are no more elements to be loaded from the store.
+     */
+    public void onQueueLoadComplete();
+
+    /**
+     * Implementors implement this to indicate whether or not the given element
+     * requires saving to the store.
+     * 
+     * @param elem
+     *            The element to check.
+     */
+    public boolean isElementPersistent(E elem);
+    
+    /**
+     * Returns the queue name used to indentify the queue in the store
+     * @return
+     */
+    public AsciiBuffer getPeristentQueueName();
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java?rev=758329&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java Wed Mar 25 16:20:40 2009
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+public interface QueueStoreHelper<E> {
+
+    /**
+     * Requests that the helper start loading elements
+     * saved for this queue. 
+     * @param queue
+     */
+    public void startLoadingQueue();
+    
+    /**
+     * Stop the helper from loading more elements stored for 
+     * the queue
+     * @param queue The queue.
+     */
+    public void stopLoadingQueue();
+    
+    /**
+     * Checks to see if there are elements in the store 
+     * for this queue. 
+     * @param queue
+     */
+    public boolean hasStoredElements();
+    
+    /**
+     * Deletes a given element for this queue from the store. 
+     * @param elem The elem to delete. 
+     */
+    public void delete(E elem, boolean flush);
+    
+    /**
+     * Saves an element to the store. 
+     * @param elem The element to be saved. 
+     */
+    public void save(E elem, boolean flush);
+    
+}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=758329&r1=758328&r2=758329&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java Wed Mar 25 16:20:40 2009
@@ -126,15 +126,8 @@
         super.onFlowOpened(sinkController);
     }
 
-    public boolean offer(V elem, ISourceController<?> source) {
-        return sinkController.offer(elem, source);
-    }
-
-    /**
-     * Performs a limited add to the queue.
-     */
-    public final void add(V value, ISourceController<?> source) {
-        sinkController.add(value, source);
+    protected final ISinkController<V> getSinkController(V elem, ISourceController<?> source) {
+        return sinkController;
     }
 
     /**

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java?rev=758329&r1=758328&r2=758329&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java Wed Mar 25 16:20:40 2009
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.queue;
-
-import org.apache.activemq.dispatch.PriorityLinkedList;
-import org.apache.activemq.flow.Flow;
-import org.apache.activemq.flow.FlowController;
-import org.apache.activemq.flow.IFlowLimiter;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.kahadb.util.LinkedNode;
-
-/**
- */
-public class SingleFlowPriorityQueue<E> extends AbstractFlowQueue<E> {
-
-    private final PriorityLinkedList<PriorityNode> queue;
-    private Mapper<Integer, E> priorityMapper;
-    private final FlowController<E> controller;
-
-    private class PriorityNode extends LinkedNode<PriorityNode> {
-        E elem;
-        int prio;
-    }
-
-    private int messagePriority = 0;
-
-    /**
-     * Creates a flow queue that can handle multiple flows.
-     * 
-     * @param flow
-     *            The {@link Flow}
-     * @param controller
-     *            The FlowController if this queue is flow controlled:
-     */
-    public SingleFlowPriorityQueue(Flow flow, String name, IFlowLimiter<E> limiter) {
-        super(name);
-        this.queue = new PriorityLinkedList<PriorityNode>(10);
-        this.controller = new FlowController<E>(getFlowControllableHook(), flow, limiter, this);
-        super.onFlowOpened(controller);
-    }
-
-    public boolean offer(E elem, ISourceController<?> source) {
-        return controller.offer(elem, source);
-    }
-
-    /**
-     * Performs a limited add to the queue.
-     */
-    public final void add(E elem, ISourceController<?> source) {
-        controller.add(elem, source);
-    }
-
-    /**
-     * Called when the controller accepts a message for this queue.
-     */
-    public final void flowElemAccepted(ISourceController<E> controller, E elem) {
-        PriorityNode node = new PriorityNode();
-        node.elem = elem;
-        node.prio = priorityMapper.map(elem);
-
-        synchronized (this) {
-            queue.add(node, node.prio);
-            updatePriority();
-            notifyReady();
-        }
-    }
-
-    public FlowController<E> getFlowController(Flow flow) {
-        return controller;
-    }
-
-    public final boolean isDispatchReady() {
-        return !queue.isEmpty();
-    }
-
-    public boolean pollingDispatch() {
-        E elem = poll();
-        if (elem != null) {
-            drain.drain(elem, controller);
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    public final E poll() {
-        synchronized (this) {
-            PriorityNode node = queue.poll();
-            // FIXME the release should really be done after dispatch.
-            // doing it here saves us from having to resynchronize
-            // after dispatch, but release limiter space too soon.
-            if (node != null) {
-                if (autoRelease) {
-                    controller.elementDispatched(node.elem);
-                }
-                return node.elem;
-            }
-            return null;
-        }
-    }
-
-    private final void updatePriority() {
-        if (dispatchContext != null) {
-            int newPrio = Math.max(queue.getHighestPriority(), dispatchPriority);
-            if (messagePriority != newPrio) {
-                messagePriority = newPrio;
-                dispatchContext.updatePriority(messagePriority);
-            }
-
-        }
-    }
-
-    public Mapper<Integer, E> getPriorityMapper() {
-        return priorityMapper;
-    }
-
-    public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
-        this.priorityMapper = priorityMapper;
-    }
-}



Mime
View raw message