activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r760028 [1/3] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/protocol/ main/java/org/apache/activemq/broker/stomp/ main/jav...
Date Mon, 30 Mar 2009 16:20:31 GMT
Author: chirino
Date: Mon Mar 30 16:20:28 2009
New Revision: 760028

URL: http://svn.apache.org/viewvc?rev=760028&view=rev
Log:
Applying Colin's patch from https://issues.apache.org/activemq/browse/AMQ-2187


Added:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/StoreFactory.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java
    activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/
    activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/kaha-db
    activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/memory
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistentQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
    activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java Mon Mar 30 16:20:28 2009
@@ -16,56 +16,86 @@
  */
 package org.apache.activemq.broker;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashSet;
 
 import org.apache.activemq.broker.store.BrokerDatabase;
+import org.apache.activemq.broker.store.BrokerDatabase.OperationContext;
 import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.PersistentQueue;
+import org.apache.activemq.protobuf.AsciiBuffer;
 
 public abstract class BrokerMessageDelivery implements MessageDelivery {
 
-    HashSet<PersistentQueue<MessageDelivery>> persistentTargets;
-    // Indicates whether or not the message has already been saved
-    // if it hasn't in memory updates can be done.
+    HashSet<AsciiBuffer> persistentTargets;
+    // Indicates whether or not the message has been saved to the
+    // database, if not then in memory updates can be done.
     boolean saved = false;
     long storeTracking = -1;
     BrokerDatabase store;
+    boolean fromStore = false;
+    boolean enableFlushDelay = true;
+    OperationContext saveContext;
+    boolean cancelled = false;
+
+    public void setFromStore(boolean val) {
+        fromStore = true;
+    }
 
     public final boolean isFromStore() {
-        return false;
+        return fromStore;
     }
 
-    public final void persist(PersistentQueue<MessageDelivery> queue) {
+    public final void persist(AsciiBuffer queue, boolean delayable) throws IOException {
 
         synchronized (this) {
             if (!saved) {
                 if (persistentTargets == null) {
-                    persistentTargets = new HashSet<PersistentQueue<MessageDelivery>>();
+                    persistentTargets = new HashSet<AsciiBuffer>();
                 }
                 persistentTargets.add(queue);
                 return;
             }
+            if (!delayable) {
+                enableFlushDelay = false;
+            }
         }
-        
-        //TODO probably need to pass in the saving queue's source controller here
-        //and treat it like it is dispatching to the saver queue. 
+
+        // TODO probably need to pass in the saving queue's source controller
+        // here and treat it like it is dispatching to the saver queue.
         store.saveMessage(this, queue, null);
     }
 
-    public final void delete(PersistentQueue<MessageDelivery> queue) {
+    public final void delete(AsciiBuffer queue) {
+        boolean firePersistListener = false;
         synchronized (this) {
             if (!saved) {
                 persistentTargets.remove(queue);
-                return;
+                if (persistentTargets.isEmpty()) {
+                    if (saveContext != null) {
+
+                        if (!cancelled) {
+                            if (saveContext.cancel()) {
+                                cancelled = true;
+                                firePersistListener = true;
+                            }
+
+                            saved = true;
+                        }
+                    }
+                }
+            } else {
+                store.deleteMessage(this, queue);
             }
         }
 
-        store.deleteMessage(this, queue);
+        if (firePersistListener) {
+            onMessagePersisted();
+        }
+
     }
 
-    public synchronized void beginStore(long storeTracking) {
-        saved = true;
+    public void setStoreTracking(long storeTracking) {
         this.storeTracking = storeTracking;
     }
 
@@ -73,21 +103,47 @@
         return storeTracking;
     }
 
-    public Collection<PersistentQueue<MessageDelivery>> getPersistentQueues() {
+    public Collection<AsciiBuffer> getPersistentQueues() {
         return persistentTargets;
     }
 
-    public void persistIfNeeded(ISourceController<?> controller) {
-        boolean saveNeeded = false;
+    public void beginStore() {
         synchronized (this) {
-            if (persistentTargets.isEmpty()) {
+            saved = true;
+        }
+    }
+
+    public void persistIfNeeded(ISourceController<?> controller) throws IOException {
+        boolean firePersistListener = false;
+        synchronized (this) {
+            boolean saveNeeded = true;
+            if (persistentTargets == null || persistentTargets.isEmpty()) {
                 saveNeeded = false;
                 saved = true;
             }
+
+            // If any of the targets requested save then save the message
+            // Note that this could be the case even if the message isn't
+            // persistent if a target requested that the message be spooled
+            // for some other reason such as queue memory overflow.
+            if (saveNeeded) {
+                saveContext = store.persistReceivedMessage(this, controller);
+            }
+            // If none of the targets required persistence, then fire the
+            // persist listener:
+            else if (isResponseRequired() && isPersistent()) {
+                firePersistListener = true;
+            }
         }
 
-        if (saveNeeded) {
-            store.persistReceivedMessage(this, controller);
+        if (firePersistListener) {
+            onMessagePersisted();
         }
+
+    }
+
+    public boolean isFlushDelayable() {
+        // TODO Auto-generated method stub
+        return enableFlushDelay;
     }
 }

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java?rev=760028&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java Mon Mar 30 16:20:28 2009
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.broker.store.BrokerDatabase;
+import org.apache.activemq.broker.store.BrokerDatabase.MessageRestoreListener;
+import org.apache.activemq.broker.store.BrokerDatabase.RestoredMessage;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.Mapper;
+import org.apache.activemq.queue.Store;
+import org.apache.activemq.queue.Subscription;
+
+public class DBQueueStore<K> implements Store<K, MessageDelivery> {
+
+    private final BrokerDatabase database;
+    private final AsciiBuffer queue;
+    private final MessageRetriever retriever;
+
+    private long firstKey = -1;
+    private long lastKey = -1;
+
+    private int count = 0;
+    private boolean loading = true;
+
+    protected HashMap<K, DBStoreNode> map = new HashMap<K, DBStoreNode>();
+    protected TreeMap<Long, DBStoreNode> order = new TreeMap<Long, DBStoreNode>();
+    private Mapper<K, MessageDelivery> keyExtractor;
+
+    DBQueueStore(BrokerDatabase database, AsciiBuffer queue, IDispatcher dispatcher) {
+        this.database = database;
+        this.queue = queue;
+        retriever = new MessageRetriever(dispatcher);
+        retriever.start();
+    }
+
+    public StoreNode<K, MessageDelivery> add(K key, MessageDelivery delivery) {
+
+        // New to this queue?
+        if (delivery.getStoreTracking() > lastKey) {
+            return addInternal(key, delivery);
+        } else {
+            throw new IllegalArgumentException(this + " Duplicate key: " + delivery);
+        }
+    }
+
+    public void setKeyMapper(Mapper<K, MessageDelivery> keyExtractor) {
+        this.keyExtractor = keyExtractor;
+    }
+    
+    private DBStoreNode addInternal(K key, MessageDelivery delivery) {
+        DBStoreNode node = new DBStoreNode(delivery);
+        map.put(keyExtractor.map(delivery), node);
+        order.put(delivery.getStoreTracking(), node);
+        return node;
+    }
+
+    public boolean isEmpty() {
+        return count == 0;
+    }
+
+    public StoreCursor<K, MessageDelivery> openCursor() {
+        return new DBStoreCursor();
+    }
+
+    public StoreCursor<K, MessageDelivery> openCursorAt(StoreNode<K, MessageDelivery> next) {
+        DBStoreCursor cursor = new DBStoreCursor();
+        cursor.next = (DBStoreNode) next;
+        return cursor;
+    }
+
+    public StoreNode<K, MessageDelivery> remove(K key) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public int size() {
+        return count;
+    }
+
+    private class DBStoreCursor implements StoreCursor<K, MessageDelivery> {
+        private long pos;
+        private long last = -1;
+        
+        private DBStoreNode node;
+        private DBStoreNode next;
+
+        public StoreNode<K, MessageDelivery> peekNext() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        public void setNext(StoreNode<K, MessageDelivery> node) {
+            this.next = (DBStoreNode) next;
+
+        }
+
+        public boolean hasNext() {
+            if (next != null)
+                return true;
+
+            SortedMap<Long, DBStoreNode> m = order.tailMap(last + 1);
+            if (m.isEmpty()) {
+                next = null;
+            } else {
+                next = m.get(m.firstKey());
+            }
+            return next != null;
+        }
+
+        public StoreNode<K, MessageDelivery> next() {
+            try {
+                hasNext();
+                return next;
+            } finally {
+                last = next.tracking;
+                next = null;
+            }
+        }
+
+        public boolean isReady() {
+            return !loading;
+        }
+        
+        public void remove() {
+            database.deleteMessage(node.delivery, queue);
+        }
+    }
+
+    private class DBStoreNode implements StoreNode<K, MessageDelivery> {
+        private MessageDelivery delivery;
+        private K key;
+        private long ownerId = -1;
+        private final long tracking;
+
+        DBStoreNode(MessageDelivery delivery) {
+            this.delivery = delivery;
+            tracking = delivery.getStoreTracking();
+            key = keyExtractor.map(delivery);
+            retriever.save(this);
+        }
+
+        public boolean acquire(Subscription<MessageDelivery> owner) {
+            long id = owner.getSink().getResourceId();
+            // TODO Auto-generated method stub
+            if (ownerId == -1 || id == ownerId) {
+                ownerId = owner.getSink().getResourceId();
+                return true;
+            }
+            return false;
+        }
+
+        public K getKey() {
+            return key;
+        }
+
+        public MessageDelivery getValue() {
+            return delivery;
+        }
+
+        public void unacquire() {
+            ownerId = -1;
+        }
+    }
+
+    private class MessageRetriever implements Dispatchable, MessageRestoreListener {
+
+        private final DispatchContext dispatchContext;
+        private AtomicBoolean loaded = new AtomicBoolean(false);
+
+        private long loadCursor = 0;
+        private long max = -1;
+        private long loadedCount;
+        
+        private final ConcurrentLinkedQueue<RestoredMessage> restoredMsgs = new ConcurrentLinkedQueue<RestoredMessage>();
+
+        MessageRetriever(IDispatcher dispatcher) {
+            dispatchContext = dispatcher.register(this, "MessageRetriever-" + queue);
+        }
+
+        public void save(DBStoreNode node) {
+            try {
+                node.delivery.persist(queue, false);
+            } catch (IOException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
+
+        public void start() {
+            if (!loaded.get()) {
+                database.restoreMessages(queue, loadCursor, 50, this);
+            }
+        }
+
+        public boolean dispatch() {
+            while (true) {
+                RestoredMessage restored = restoredMsgs.poll();
+
+                if (restored == null) {
+                    break;
+                }
+
+                try {
+                    MessageDelivery delivery = restored.getMessageDelivery();
+                    addInternal(keyExtractor.map(delivery), delivery);
+                    if (firstKey == -1) {
+                        firstKey = delivery.getStoreTracking();
+                    }
+                    if (lastKey < delivery.getStoreTracking()) {
+                        lastKey = delivery.getStoreTracking();
+                    }
+                    loadedCount++;
+
+                } catch (IOException e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+            }
+
+            if (!loaded.get()) {
+                database.restoreMessages(queue, loadCursor, 50, this);
+            }
+            return false;
+        }
+
+        public void messagesRestored(Collection<RestoredMessage> msgs) {
+            if (!msgs.isEmpty()) {
+                restoredMsgs.addAll(msgs);
+            } else {
+                loaded.set(true);
+            }
+            dispatchContext.requestDispatch();
+        }
+    }    
+}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java Mon Mar 30 16:20:28 2009
@@ -23,6 +23,8 @@
 
 public interface DeliveryTarget {
     
+    public void deliver(MessageDelivery delivery, ISourceController<?> source);
+    
     public IFlowSink<MessageDelivery> getSink();
     
     public boolean match(MessageDelivery message);

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java Mon Mar 30 16:20:28 2009
@@ -23,6 +23,8 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.Connection;
+import org.apache.activemq.broker.store.BrokerDatabase;
+import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.transport.DispatchableTransportServer;
@@ -45,6 +47,7 @@
     private String connectUri;
     private String name;
     private IDispatcher dispatcher;
+    private BrokerDatabase database;
     private final AtomicBoolean stopping = new AtomicBoolean();
 
     public String getName() {
@@ -62,12 +65,19 @@
         for (VirtualHost virtualHost : virtualHosts.values()) {
             virtualHost.stop();
         }
+        database.stop();
         dispatcher.shutdown();
 
     }
 
     public final void start() throws Exception {
         dispatcher.start();
+        if (database != null) {
+            database.start();
+        } else {
+            throw new Exception("Store not initialized");
+        }
+        addVirtualHost(getDefaultVirtualHost());
 
         for (VirtualHost virtualHost : virtualHosts.values()) {
             virtualHost.start();
@@ -134,12 +144,16 @@
     }
 
     // /////////////////////////////////////////////////////////////////
-    // Virtual Host Related Opperations 
+    // Virtual Host Related Opperations
     // /////////////////////////////////////////////////////////////////
     public VirtualHost getDefaultVirtualHost() {
         synchronized (virtualHosts) {
-            if( defaultVirtualHost==null ) {
+            if (defaultVirtualHost == null) {
                 defaultVirtualHost = new VirtualHost();
+                defaultVirtualHost.setDatabase(database);
+                ArrayList<AsciiBuffer> names = new ArrayList<AsciiBuffer>(1);
+                names.add(new AsciiBuffer("default"));
+                defaultVirtualHost.setHostNames(names);
             }
             return defaultVirtualHost;
         }
@@ -174,6 +188,7 @@
                 setDefaultVirtualHost(host);
             }
         }
+        host.setDatabase(database);
     }
 
     public synchronized void removeVirtualHost(VirtualHost host) throws Exception {
@@ -181,9 +196,10 @@
             for (AsciiBuffer name : host.getHostNames()) {
                 virtualHosts.remove(name);
             }
-            // Was the default virtual host removed? Set the default to the next virtual host.
-            if( host == defaultVirtualHost ) {
-                if( virtualHosts.isEmpty() ) {
+            // Was the default virtual host removed? Set the default to the next
+            // virtual host.
+            if (host == defaultVirtualHost) {
+                if (virtualHosts.isEmpty()) {
                     defaultVirtualHost = null;
                 } else {
                     defaultVirtualHost = virtualHosts.values().iterator().next();
@@ -204,4 +220,8 @@
         }
     }
 
+    public void setStore(Store store) {
+        database = new BrokerDatabase(store, dispatcher);
+    }
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Mon Mar 30 16:20:28 2009
@@ -16,10 +16,11 @@
  */
 package org.apache.activemq.broker;
 
+import java.io.IOException;
+
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.queue.PersistentQueue;
 
 public interface MessageDelivery {
 
@@ -53,13 +54,19 @@
      */
     public void onMessagePersisted();
 
-    public Store.MessageRecord createMessageRecord();
+    public Store.MessageRecord createMessageRecord() throws IOException;
 
     public Buffer getTransactionId();
 
-    public void persist(PersistentQueue<MessageDelivery> queue);
+    public void persist(AsciiBuffer queue, boolean delayable) throws IOException;
 
-    public void delete(PersistentQueue<MessageDelivery> queue);
+    public void delete(AsciiBuffer queue);
+    
+    /**
+     * Sets the unique storage tracking number. 
+     * @param tracking The tracking number. 
+     */
+    public void setStoreTracking(long tracking);
     
     /**
      * Gets the tracking number used to identify this message in the message

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java Mon Mar 30 16:20:28 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker;
 
+import java.io.IOException;
 import java.util.HashMap;
 
 import org.apache.activemq.broker.DeliveryTarget;
@@ -51,6 +52,10 @@
                 protected IQueue<AsciiBuffer, MessageDelivery> cratePartition(Integer partitionKey) {
                     return createSharedFlowQueue();
                 }
+
+                public boolean isElementPersistent(MessageDelivery elem) {
+                    return elem.isPersistent();
+                }
             };
             queue.setPartitionMapper(partitionMapper);
             queue.setResourceName(destination.getName().toString());
@@ -74,6 +79,9 @@
             SharedPriorityQueue<AsciiBuffer, MessageDelivery> queue = new SharedPriorityQueue<AsciiBuffer, MessageDelivery>(destination.getName().toString(), limiter);
             queue.setKeyMapper(keyExtractor);
             queue.setAutoRelease(true);
+            //DBQueueStore<AsciiBuffer> store = new DBQueueStore<AsciiBuffer>(broker.getDefaultVirtualHost().getDatabase(), queue, broker.getDispatcher());
+            //store.setKeyMapper(keyExtractor);
+            //queue.setStore(store);
             queue.setDispatcher(broker.getDispatcher());
             return queue;
         } else {
@@ -81,13 +89,26 @@
             SharedQueue<AsciiBuffer, MessageDelivery> queue = new SharedQueue<AsciiBuffer, MessageDelivery>(destination.getName().toString(), limiter);
             queue.setKeyMapper(keyExtractor);
             queue.setAutoRelease(true);
+            //DBQueueStore<AsciiBuffer> store = new DBQueueStore<AsciiBuffer>(broker.getDefaultVirtualHost().getDatabase(), queue, broker.getDispatcher());
+            //store.setKeyMapper(keyExtractor);
+            //queue.setStore(store);
             queue.setDispatcher(broker.getDispatcher());
             return queue;
         }
     }
 
-    public final void deliver(ISourceController<MessageDelivery> source, MessageDelivery msg) {
-        queue.add(msg, source);
+    public final void deliver(MessageDelivery delivery, ISourceController<?> source) {
+        try {
+            if(delivery.isPersistent())
+            {
+                delivery.persist(destination.getName(), true);
+            }
+        } catch (IOException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        
+        queue.add(delivery, source);
     }
     
     public final Destination getDestination() {
@@ -180,5 +201,4 @@
     public boolean isDurable() {
         return true;
     }
-
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java Mon Mar 30 16:20:28 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -26,12 +27,9 @@
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.QueueDomain;
 import org.apache.activemq.broker.TopicDomain;
-import org.apache.activemq.broker.store.Store.Callback;
-import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.broker.store.Store.VoidCallback;
+import org.apache.activemq.broker.store.BrokerDatabase;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.protobuf.Buffer;
 
 final public class Router {
 
@@ -40,6 +38,7 @@
 
     private final HashMap<AsciiBuffer, Domain> domains = new HashMap<AsciiBuffer, Domain>();
     private VirtualHost virtualHost;
+    private BrokerDatabase database;
 
     public Router() {
         domains.put(QUEUE_DOMAIN, new QueueDomain());
@@ -90,8 +89,9 @@
         //        
         Collection<DeliveryTarget> targets = route(msg.getDestination(), msg);
 
-        msg.store = getVirtualHost().getDatabase();
-        
+        msg.store = database;
+        msg.setStoreTracking(msg.store.allocateStoreTracking());
+
         // TODO:
         // Consider doing some caching of this target list. Most producers
         // always send to the same destination.
@@ -108,15 +108,26 @@
                 }
             }
 
-            //The sinks will request persistence via MessageDelivery.persist()
-            //if they require persistence:
+            // The sinks will request persistence via MessageDelivery.persist()
+            // if they require persistence:
             for (DeliveryTarget dt : targets) {
-                if (dt.match(msg)) {
-                    dt.getSink().add(msg, controller);
-                }
+                dt.deliver(msg, controller);
+                //if (dt.match(msg)) {
+                //    
+                //    dt.getSink().add(msg, controller);
+                //}
             }
             
-            msg.persistIfNeeded(controller);
+            try {
+                msg.persistIfNeeded(controller);
+            } catch (IOException ioe) {
+                //TODO: Error serializing the message, this should trigger an error
+                //This is a pretty severe error as we've already delivered
+                //the message to the recipients. If we send an error response
+                //back it could result in a duplicate. Does this mean that we
+                //should persist the message prior to sending to the recips?
+                ioe.printStackTrace();
+            }
 
         } else {
             // Let the client know we got the message even though there
@@ -147,10 +158,15 @@
 
     public void setVirtualHost(VirtualHost virtualHost) {
         this.virtualHost = virtualHost;
+        this.database = virtualHost.getDatabase();
     }
 
     public VirtualHost getVirtualHost() {
         return virtualHost;
     }
 
+    public void setDatabase(BrokerDatabase database) {
+        this.database = database;
+    }
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java Mon Mar 30 16:20:28 2009
@@ -31,10 +31,11 @@
     final private HashMap<Destination, Queue> queues = new HashMap<Destination, Queue>();
     private ArrayList<AsciiBuffer> hostNames = new ArrayList<AsciiBuffer>();
     private Router router;
-    private BrokerDatabase database = new BrokerDatabase();
+    private BrokerDatabase database;
     
     public VirtualHost() {
-        setRouter(new Router());
+        this.router = new Router();
+        this.router.setVirtualHost(this);
     }
     
     public AsciiBuffer getHostName() {
@@ -54,10 +55,6 @@
     public Router getRouter() {
         return router;
     }
-    public void setRouter(Router router) {
-        this.router = router;
-        this.router.setVirtualHost(this);
-    }
 
     public void start() throws Exception {
         for (Queue queue : queues.values()) {
@@ -81,6 +78,7 @@
 
     public void setDatabase(BrokerDatabase store) {
         this.database = store;
+        router.setDatabase(database);
     }
 
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Mon Mar 30 16:20:28 2009
@@ -16,12 +16,16 @@
  */
 package org.apache.activemq.broker.openwire;
 
+import java.io.IOException;
+
 import org.apache.activemq.broker.BrokerMessageDelivery;
 import org.apache.activemq.broker.Destination;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.util.ByteSequence;
 
 public class OpenWireMessageDelivery extends BrokerMessageDelivery {
 
@@ -30,6 +34,7 @@
     private final Message message;
     private Destination destination;
     private AsciiBuffer producerId;
+    private OpenWireFormat storeWireFormat;
     private PersistListener persistListener = null;
 
     public interface PersistListener {
@@ -60,7 +65,7 @@
     }
 
     public AsciiBuffer getMsgId() {
-        return null;
+        return new AsciiBuffer(message.getMessageId().toString());
     }
 
     public AsciiBuffer getProducerId() {
@@ -100,13 +105,12 @@
         return message.isResponseRequired();
     }
 
-
-    public MessageRecord createMessageRecord() {
+    public MessageRecord createMessageRecord() throws IOException {
         MessageRecord record = new MessageRecord();
         record.setEncoding(ENCODING);
-        // TODO: Serialize it..
-        // record.setBuffer()
-        // record.setStreamKey(stream);
+        ByteSequence bytes = storeWireFormat.marshal(message);
+        record.setBuffer(new Buffer(bytes.getData(), bytes.getOffset(), bytes.getLength()));
+        record.setStreamKey((long) 0);
         record.setMessageId(getMsgId());
         return record;
     }
@@ -116,4 +120,7 @@
         return null;
     }
 
+    public void setStoreWireFormat(OpenWireFormat wireFormat) {
+        this.storeWireFormat = wireFormat;
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Mon Mar 30 16:20:28 2009
@@ -16,8 +16,10 @@
  */
 package org.apache.activemq.broker.openwire;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
@@ -51,6 +53,7 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerId;
@@ -66,21 +69,25 @@
 import org.apache.activemq.filter.LogicExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NoLocalExpression;
+import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.IFlowDrain;
+import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.IFlowResource;
 import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.IFlowSource;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.transport.WireFormatNegotiator;
+import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.wireformat.WireFormat;
 
 public class OpenwireProtocolHandler implements ProtocolHandler, PersistListener {
@@ -88,30 +95,13 @@
     protected final HashMap<ProducerId, ProducerContext> producers = new HashMap<ProducerId, ProducerContext>();
     protected final HashMap<ConsumerId, ConsumerContext> consumers = new HashMap<ConsumerId, ConsumerContext>();
 
-    protected final Object inboundMutex = new Object();
-    protected IFlowController<OpenWireMessageDelivery> inboundController;
-
     protected BrokerConnection connection;
     private OpenWireFormat wireFormat;
+    private OpenWireFormat storeWireFormat;
     private Router router;
 
     public void start() throws Exception {
-        // Setup the inbound processing..
-        final Flow flow = new Flow("broker-" + connection.getName() + "-inbound", false);
-        SizeLimiter<OpenWireMessageDelivery> limiter = new SizeLimiter<OpenWireMessageDelivery>(connection.getInputWindowSize(), connection.getInputResumeThreshold());
-        inboundController = new FlowController<OpenWireMessageDelivery>(new FlowControllableAdapter() {
-            public void flowElemAccepted(ISourceController<OpenWireMessageDelivery> controller, OpenWireMessageDelivery elem) {
-                if (elem.isResponseRequired()) {
-                    elem.setPersistListener(OpenwireProtocolHandler.this);
-                }
-                router.route(elem, controller);
-                controller.elementDispatched(elem);
-            }
 
-            public String toString() {
-                return flow.getFlowName();
-            }
-        }, flow, limiter, inboundMutex);
     }
 
     public void stop() throws Exception {
@@ -172,6 +162,7 @@
                     ProducerContext producerContext = producers.get(producerId);
 
                     OpenWireMessageDelivery md = new OpenWireMessageDelivery(info);
+                    md.setStoreWireFormat(storeWireFormat);
 
                     // Only producers that are not using a window will block,
                     // and if it blocks.
@@ -346,11 +337,10 @@
             }.start();
         }
     }
-    
 
     public void onMessagePersisted(OpenWireMessageDelivery delivery) {
         // TODO This method should not block:
-        // Either add to output queue, or spin off in a separate thread. 
+        // Either add to output queue, or spin off in a separate thread.
         ack(delivery.getMessage());
     }
 
@@ -367,52 +357,44 @@
     // Internal Support Methods
     // /////////////////////////////////////////////////////////////////
 
-    static class FlowControllableAdapter implements FlowControllable<OpenWireMessageDelivery> {
-        public void flowElemAccepted(ISourceController<OpenWireMessageDelivery> controller, OpenWireMessageDelivery elem) {
-        }
-
-        public IFlowSink<OpenWireMessageDelivery> getFlowSink() {
-            return null;
-        }
-
-        public IFlowSource<OpenWireMessageDelivery> getFlowSource() {
-            return null;
-        }
-    }
-
-    class ProducerContext {
+    class ProducerContext extends AbstractLimitedFlowResource<OpenWireMessageDelivery> {
 
+        protected final Object inboundMutex = new Object();
         private IFlowController<OpenWireMessageDelivery> controller;
         private String name;
 
         public ProducerContext(final ProducerInfo info) {
-            this.name = info.getProducerId().toString();
+            super(info.getProducerId().toString());
+            final Flow flow = new Flow("broker-" + name + "-inbound", false);
 
             // Openwire only uses credit windows at the producer level for
             // producers that request the feature.
+            IFlowLimiter<OpenWireMessageDelivery> limiter;
             if (info.getWindowSize() > 0) {
-                final Flow flow = new Flow("broker-" + name + "-inbound", false);
-                WindowLimiter<OpenWireMessageDelivery> limiter = new WindowLimiter<OpenWireMessageDelivery>(false, flow, info.getWindowSize(), info.getWindowSize() / 2) {
+                limiter = new WindowLimiter<OpenWireMessageDelivery>(false, flow, info.getWindowSize(), info.getWindowSize() / 2) {
                     @Override
                     protected void sendCredit(int credit) {
                         ProducerAck ack = new ProducerAck(info.getProducerId(), credit);
                         connection.write(ack);
                     }
                 };
-
-                controller = new FlowController<OpenWireMessageDelivery>(new FlowControllableAdapter() {
-                    public void flowElemAccepted(ISourceController<OpenWireMessageDelivery> controller, OpenWireMessageDelivery msg) {
-                        router.route(msg, controller);
-                        controller.elementDispatched(msg);
-                    }
-
-                    public String toString() {
-                        return flow.getFlowName();
-                    }
-                }, flow, limiter, inboundMutex);
             } else {
-                controller = inboundController;
+
+                limiter = new SizeLimiter<OpenWireMessageDelivery>(connection.getInputWindowSize(), connection.getInputResumeThreshold());
             }
+
+            controller = new FlowController<OpenWireMessageDelivery>(new FlowControllable<OpenWireMessageDelivery>() {
+                public void flowElemAccepted(ISourceController<OpenWireMessageDelivery> controller, OpenWireMessageDelivery msg) {
+                    router.route(msg, controller);
+                    controller.elementDispatched(msg);
+                }
+
+                public IFlowResource getFlowResource() {
+                    return ProducerContext.this;
+                }
+            }, flow, limiter, inboundMutex);
+
+            super.onFlowOpened(controller);
         }
     }
 
@@ -422,13 +404,27 @@
         private String name;
         private BooleanExpression selector;
         private boolean durable;
+        private AsciiBuffer durableQueueName;
 
         private SingleFlowRelay<MessageDelivery> queue;
         public WindowLimiter<MessageDelivery> limiter;
 
+        HashMap<MessageId, MessageDelivery> pendingMessages = new HashMap<MessageId, MessageDelivery>();
+        LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
+
         public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException {
             this.info = info;
             this.name = info.getConsumerId().toString();
+            durable = info.isDurable();
+            if (durable) {
+                durableQueueName = new AsciiBuffer(info.getSubscriptionName());
+                try {
+                    connection.getBroker().getDefaultVirtualHost().getDatabase().addQueue(durableQueueName);
+                } catch (Throwable thrown) {
+                    thrown.printStackTrace();
+                }
+            }
+
             selector = parseSelector(info);
 
             Flow flow = new Flow("broker-" + name + "-outbound", false);
@@ -445,6 +441,13 @@
                     md.setConsumerId(info.getConsumerId());
                     md.setMessage(msg);
                     md.setDestination(msg.getDestination());
+                    // Add to the pending list if persistent and we are durable:
+                    if (isDurable() && message.isPersistent()) {
+                        synchronized (queue) {
+                            pendingMessages.put(msg.getMessageId(), message);
+                            pendingMessageIds.add(msg.getMessageId());
+                        }
+                    }
                     connection.write(md);
                 };
             });
@@ -452,6 +455,19 @@
 
         public void ack(MessageAck info) {
             synchronized (queue) {
+                if (isDurable()) {
+                    MessageId id = info.getLastMessageId();
+                    while (!pendingMessageIds.isEmpty()) {
+                        MessageId pendingId = pendingMessageIds.peekFirst();
+                        MessageDelivery delivery = pendingMessages.remove(pendingId);
+                        delivery.delete(durableQueueName);
+                        pendingMessageIds.removeFirst();
+                        if (pendingId.equals(id)) {
+                            break;
+                        }
+                    }
+
+                }
                 limiter.onProtocolCredit(info.getMessageCount());
             }
         }
@@ -460,6 +476,23 @@
             return queue;
         }
 
+        public final void deliver(MessageDelivery delivery, ISourceController<?> source) {
+            if (!match(delivery)) {
+                return;
+            }
+
+            if (isDurable() && delivery.isPersistent()) {
+                try {
+                    delivery.persist(durableQueueName, true);
+                } catch (IOException e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+            }
+
+            queue.add(delivery, source);
+        }
+
         public boolean match(MessageDelivery message) {
             Message msg = message.asType(Message.class);
             if (msg == null) {
@@ -541,9 +574,17 @@
 
     public void setWireFormat(WireFormat wireFormat) {
         this.wireFormat = (OpenWireFormat) wireFormat;
+        this.storeWireFormat = this.wireFormat.copy();
+        storeWireFormat.setCacheEnabled(false);
+        storeWireFormat.setTightEncodingEnabled(false);
+        storeWireFormat.setSizePrefixDisabled(false);
     }
 
-    public MessageDelivery createMessageDelivery(MessageRecord record) {
-        throw new UnsupportedOperationException();
+    public MessageDelivery createMessageDelivery(MessageRecord record) throws IOException {
+        Buffer buf = record.getBuffer();
+        Message message = (Message) storeWireFormat.unmarshal(new ByteSequence(buf.data, buf.offset, buf.length));
+        OpenWireMessageDelivery delivery = new OpenWireMessageDelivery(message);
+        delivery.setStoreWireFormat(storeWireFormat);
+        return delivery;
     }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java Mon Mar 30 16:20:28 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.protocol;
 
+import java.io.IOException;
+
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.BrokerConnection;
 import org.apache.activemq.broker.MessageDelivery;
@@ -29,5 +31,6 @@
     public void onException(Exception error);
     public void setWireFormat(WireFormat wf);
     
-    public MessageDelivery createMessageDelivery(MessageRecord record);
+    public MessageDelivery createMessageDelivery(MessageRecord record) throws IOException;
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Mon Mar 30 16:20:28 2009
@@ -38,12 +38,13 @@
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.IFlowDrain;
+import org.apache.activemq.flow.IFlowResource;
 import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.IFlowSource;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
@@ -63,12 +64,11 @@
         public void onStompFrame(StompFrame frame) throws Exception;
     }
 
+    private InboundContext inboundContext;
+
     protected final HashMap<String, ActionHander> actionHandlers = new HashMap<String, ActionHander>();
     protected final HashMap<String, ConsumerContext> consumers = new HashMap<String, ConsumerContext>();
 
-    protected final Object inboundMutex = new Object();
-    protected IFlowController<StompMessageDelivery> inboundController;
-
     protected BrokerConnection connection;
 
     // TODO: need to update the FrameTranslator to normalize to new broker API
@@ -99,15 +99,14 @@
             }
         });
         actionHandlers.put(Stomp.Commands.SEND, new ActionHander() {
+            
             public void onStompFrame(StompFrame frame) throws Exception {
                 String dest = frame.getHeaders().get(Stomp.Headers.Send.DESTINATION);
                 Destination destination = translator(frame).convertToDestination(StompProtocolHandler.this, dest);
 
                 frame.setAction(Stomp.Responses.MESSAGE);
                 StompMessageDelivery md = new StompMessageDelivery(frame, destination);
-                while (!inboundController.offer(md, null)) {
-                    inboundController.waitForFlowUnblock();
-                }
+                inboundContext.onReceive(md);
             }
         });
         actionHandlers.put(Stomp.Commands.SUBSCRIBE, new ActionHander() {
@@ -147,22 +146,7 @@
     }
 
     public void start() throws Exception {
-        // Setup the inbound processing..
-        final Flow inboundFlow = new Flow("broker-" + connection.getName() + "-inbound", false);
-        SizeLimiter<StompMessageDelivery> inLimiter = new SizeLimiter<StompMessageDelivery>(connection.getInputWindowSize(), connection.getInputResumeThreshold());
-        inboundController = new FlowController<StompMessageDelivery>(new FlowControllableAdapter() {
-            public void flowElemAccepted(ISourceController<StompMessageDelivery> controller, StompMessageDelivery elem) {
-                if (elem.isResponseRequired()) {
-                    elem.setPersistListener(StompProtocolHandler.this);
-                }
-                router.route(elem, controller);
-                controller.elementDispatched(elem);
-            }
-
-            public String toString() {
-                return inboundFlow.getFlowName();
-            }
-        }, inboundFlow, inLimiter, inboundMutex);
+        inboundContext = new InboundContext();
 
         Flow outboundFlow = new Flow("broker-" + connection.getName() + "-outbound", false);
         SizeLimiter<MessageDelivery> outLimiter = new SizeLimiter<MessageDelivery>(connection.getOutputWindowSize(), connection.getOutputWindowSize());
@@ -240,16 +224,40 @@
     // /////////////////////////////////////////////////////////////////
     // Internal Support Methods
     // /////////////////////////////////////////////////////////////////
-    static class FlowControllableAdapter implements FlowControllable<StompMessageDelivery> {
-        public void flowElemAccepted(ISourceController<StompMessageDelivery> controller, StompMessageDelivery elem) {
-        }
 
-        public IFlowSink<StompMessageDelivery> getFlowSink() {
-            return null;
+    class InboundContext extends AbstractLimitedFlowResource<StompMessageDelivery> {
+        protected final Object inboundMutex = new Object();
+        protected IFlowController<StompMessageDelivery> inboundController;
+
+        InboundContext() {
+            super("broker-" + connection.getName() + "-inbound");
+            // Setup the inbound processing..
+            final Flow inboundFlow = new Flow(getResourceName(), false);
+            SizeLimiter<StompMessageDelivery> inLimiter = new SizeLimiter<StompMessageDelivery>(connection.getInputWindowSize(), connection.getInputResumeThreshold());
+            inboundController = new FlowController<StompMessageDelivery>(new FlowControllable<StompMessageDelivery>() {
+                public void flowElemAccepted(ISourceController<StompMessageDelivery> controller, StompMessageDelivery elem) {
+                    if (elem.isResponseRequired()) {
+                        elem.setPersistListener(StompProtocolHandler.this);
+                    }
+                    router.route(elem, controller);
+                    controller.elementDispatched(elem);
+                }
+
+                public String toString() {
+                    return inboundFlow.getFlowName();
+                }
+
+                public IFlowResource getFlowResource() {
+                    return InboundContext.this;
+                }
+            }, inboundFlow, inLimiter, inboundMutex);
+            super.onFlowOpened(inboundController);
         }
 
-        public IFlowSource<StompMessageDelivery> getFlowSource() {
-            return null;
+        public void onReceive(StompMessageDelivery md) throws InterruptedException {
+            while (!inboundController.offer(md, null)) {
+                inboundController.waitForFlowUnblock();
+            }
         }
     }
 
@@ -268,6 +276,7 @@
         private LinkedHashMap<AsciiBuffer, AsciiBuffer> sentMessageIds = new LinkedHashMap<AsciiBuffer, AsciiBuffer>();
 
         private boolean durable;
+        private AsciiBuffer durableQueueName;
 
         public ConsumerContext(final StompFrame subscribe) throws Exception {
             translator = translator(subscribe);
@@ -378,6 +387,24 @@
             // }
         }
 
+        public void deliver(MessageDelivery delivery, ISourceController<?> source) {
+            if (!match(delivery)) {
+                return;
+            }
+
+            if (isDurable() && delivery.isPersistent()) {
+                try {
+                    delivery.persist(durableQueueName, true);
+                } catch (IOException e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+            }
+
+            queue.add(delivery, source);
+
+        }
+
         public boolean isDurable() {
             return durable;
         }
@@ -407,9 +434,10 @@
         connection.write(errorMessage);
     }
 
-    //Callback from MessageDelivery when message's persistence guarantees are met. 
+    // Callback from MessageDelivery when message's persistence guarantees are
+    // met.
     public void onMessagePersisted(StompMessageDelivery delivery) {
-        //TODO this method must not block:
+        // TODO this method must not block:
         ack(delivery.getStomeFame());
     }
 
@@ -483,7 +511,7 @@
         // TODO Auto-generated method stub
         return null;
     }
-    
+
     public MessageDelivery createMessageDelivery(MessageRecord record) {
         throw new UnsupportedOperationException();
     }



Mime
View raw message