Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 55997 invoked from network); 30 Mar 2009 16:21:09 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 30 Mar 2009 16:21:09 -0000 Received: (qmail 17697 invoked by uid 500); 30 Mar 2009 16:21:09 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 17644 invoked by uid 500); 30 Mar 2009 16:21:09 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 17635 invoked by uid 99); 30 Mar 2009 16:21:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Mar 2009 16:21:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Mar 2009 16:20:57 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 12192238889D; Mon, 30 Mar 2009 16:20:37 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r760028 [1/3] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/protocol/ main/java/org/apache/activemq/broker/stomp/ main/jav... Date: Mon, 30 Mar 2009 16:20:31 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090330162037.12192238889D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Mon Mar 30 16:20:28 2009 New Revision: 760028 URL: http://svn.apache.org/viewvc?rev=760028&view=rev Log: Applying Colin's patch from https://issues.apache.org/activemq/browse/AMQ-2187 Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/StoreFactory.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/ activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/kaha-db activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/memory Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistentQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java Mon Mar 30 16:20:28 2009 @@ -16,56 +16,86 @@ */ package org.apache.activemq.broker; +import java.io.IOException; import java.util.Collection; import java.util.HashSet; import org.apache.activemq.broker.store.BrokerDatabase; +import org.apache.activemq.broker.store.BrokerDatabase.OperationContext; import org.apache.activemq.flow.ISourceController; -import org.apache.activemq.queue.PersistentQueue; +import org.apache.activemq.protobuf.AsciiBuffer; public abstract class BrokerMessageDelivery implements MessageDelivery { - HashSet> persistentTargets; - // Indicates whether or not the message has already been saved - // if it hasn't in memory updates can be done. + HashSet persistentTargets; + // Indicates whether or not the message has been saved to the + // database, if not then in memory updates can be done. boolean saved = false; long storeTracking = -1; BrokerDatabase store; + boolean fromStore = false; + boolean enableFlushDelay = true; + OperationContext saveContext; + boolean cancelled = false; + + public void setFromStore(boolean val) { + fromStore = true; + } public final boolean isFromStore() { - return false; + return fromStore; } - public final void persist(PersistentQueue queue) { + public final void persist(AsciiBuffer queue, boolean delayable) throws IOException { synchronized (this) { if (!saved) { if (persistentTargets == null) { - persistentTargets = new HashSet>(); + persistentTargets = new HashSet(); } persistentTargets.add(queue); return; } + if (!delayable) { + enableFlushDelay = false; + } } - - //TODO probably need to pass in the saving queue's source controller here - //and treat it like it is dispatching to the saver queue. + + // TODO probably need to pass in the saving queue's source controller + // here and treat it like it is dispatching to the saver queue. store.saveMessage(this, queue, null); } - public final void delete(PersistentQueue queue) { + public final void delete(AsciiBuffer queue) { + boolean firePersistListener = false; synchronized (this) { if (!saved) { persistentTargets.remove(queue); - return; + if (persistentTargets.isEmpty()) { + if (saveContext != null) { + + if (!cancelled) { + if (saveContext.cancel()) { + cancelled = true; + firePersistListener = true; + } + + saved = true; + } + } + } + } else { + store.deleteMessage(this, queue); } } - store.deleteMessage(this, queue); + if (firePersistListener) { + onMessagePersisted(); + } + } - public synchronized void beginStore(long storeTracking) { - saved = true; + public void setStoreTracking(long storeTracking) { this.storeTracking = storeTracking; } @@ -73,21 +103,47 @@ return storeTracking; } - public Collection> getPersistentQueues() { + public Collection getPersistentQueues() { return persistentTargets; } - public void persistIfNeeded(ISourceController controller) { - boolean saveNeeded = false; + public void beginStore() { synchronized (this) { - if (persistentTargets.isEmpty()) { + saved = true; + } + } + + public void persistIfNeeded(ISourceController controller) throws IOException { + boolean firePersistListener = false; + synchronized (this) { + boolean saveNeeded = true; + if (persistentTargets == null || persistentTargets.isEmpty()) { saveNeeded = false; saved = true; } + + // If any of the targets requested save then save the message + // Note that this could be the case even if the message isn't + // persistent if a target requested that the message be spooled + // for some other reason such as queue memory overflow. + if (saveNeeded) { + saveContext = store.persistReceivedMessage(this, controller); + } + // If none of the targets required persistence, then fire the + // persist listener: + else if (isResponseRequired() && isPersistent()) { + firePersistListener = true; + } } - if (saveNeeded) { - store.persistReceivedMessage(this, controller); + if (firePersistListener) { + onMessagePersisted(); } + + } + + public boolean isFlushDelayable() { + // TODO Auto-generated method stub + return enableFlushDelay; } } Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java?rev=760028&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java Mon Mar 30 16:20:28 2009 @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.broker.store.BrokerDatabase; +import org.apache.activemq.broker.store.BrokerDatabase.MessageRestoreListener; +import org.apache.activemq.broker.store.BrokerDatabase.RestoredMessage; +import org.apache.activemq.dispatch.IDispatcher; +import org.apache.activemq.dispatch.IDispatcher.DispatchContext; +import org.apache.activemq.dispatch.IDispatcher.Dispatchable; +import org.apache.activemq.protobuf.AsciiBuffer; +import org.apache.activemq.queue.Mapper; +import org.apache.activemq.queue.Store; +import org.apache.activemq.queue.Subscription; + +public class DBQueueStore implements Store { + + private final BrokerDatabase database; + private final AsciiBuffer queue; + private final MessageRetriever retriever; + + private long firstKey = -1; + private long lastKey = -1; + + private int count = 0; + private boolean loading = true; + + protected HashMap map = new HashMap(); + protected TreeMap order = new TreeMap(); + private Mapper keyExtractor; + + DBQueueStore(BrokerDatabase database, AsciiBuffer queue, IDispatcher dispatcher) { + this.database = database; + this.queue = queue; + retriever = new MessageRetriever(dispatcher); + retriever.start(); + } + + public StoreNode add(K key, MessageDelivery delivery) { + + // New to this queue? + if (delivery.getStoreTracking() > lastKey) { + return addInternal(key, delivery); + } else { + throw new IllegalArgumentException(this + " Duplicate key: " + delivery); + } + } + + public void setKeyMapper(Mapper keyExtractor) { + this.keyExtractor = keyExtractor; + } + + private DBStoreNode addInternal(K key, MessageDelivery delivery) { + DBStoreNode node = new DBStoreNode(delivery); + map.put(keyExtractor.map(delivery), node); + order.put(delivery.getStoreTracking(), node); + return node; + } + + public boolean isEmpty() { + return count == 0; + } + + public StoreCursor openCursor() { + return new DBStoreCursor(); + } + + public StoreCursor openCursorAt(StoreNode next) { + DBStoreCursor cursor = new DBStoreCursor(); + cursor.next = (DBStoreNode) next; + return cursor; + } + + public StoreNode remove(K key) { + // TODO Auto-generated method stub + return null; + } + + public int size() { + return count; + } + + private class DBStoreCursor implements StoreCursor { + private long pos; + private long last = -1; + + private DBStoreNode node; + private DBStoreNode next; + + public StoreNode peekNext() { + // TODO Auto-generated method stub + return null; + } + + public void setNext(StoreNode node) { + this.next = (DBStoreNode) next; + + } + + public boolean hasNext() { + if (next != null) + return true; + + SortedMap m = order.tailMap(last + 1); + if (m.isEmpty()) { + next = null; + } else { + next = m.get(m.firstKey()); + } + return next != null; + } + + public StoreNode next() { + try { + hasNext(); + return next; + } finally { + last = next.tracking; + next = null; + } + } + + public boolean isReady() { + return !loading; + } + + public void remove() { + database.deleteMessage(node.delivery, queue); + } + } + + private class DBStoreNode implements StoreNode { + private MessageDelivery delivery; + private K key; + private long ownerId = -1; + private final long tracking; + + DBStoreNode(MessageDelivery delivery) { + this.delivery = delivery; + tracking = delivery.getStoreTracking(); + key = keyExtractor.map(delivery); + retriever.save(this); + } + + public boolean acquire(Subscription owner) { + long id = owner.getSink().getResourceId(); + // TODO Auto-generated method stub + if (ownerId == -1 || id == ownerId) { + ownerId = owner.getSink().getResourceId(); + return true; + } + return false; + } + + public K getKey() { + return key; + } + + public MessageDelivery getValue() { + return delivery; + } + + public void unacquire() { + ownerId = -1; + } + } + + private class MessageRetriever implements Dispatchable, MessageRestoreListener { + + private final DispatchContext dispatchContext; + private AtomicBoolean loaded = new AtomicBoolean(false); + + private long loadCursor = 0; + private long max = -1; + private long loadedCount; + + private final ConcurrentLinkedQueue restoredMsgs = new ConcurrentLinkedQueue(); + + MessageRetriever(IDispatcher dispatcher) { + dispatchContext = dispatcher.register(this, "MessageRetriever-" + queue); + } + + public void save(DBStoreNode node) { + try { + node.delivery.persist(queue, false); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + public void start() { + if (!loaded.get()) { + database.restoreMessages(queue, loadCursor, 50, this); + } + } + + public boolean dispatch() { + while (true) { + RestoredMessage restored = restoredMsgs.poll(); + + if (restored == null) { + break; + } + + try { + MessageDelivery delivery = restored.getMessageDelivery(); + addInternal(keyExtractor.map(delivery), delivery); + if (firstKey == -1) { + firstKey = delivery.getStoreTracking(); + } + if (lastKey < delivery.getStoreTracking()) { + lastKey = delivery.getStoreTracking(); + } + loadedCount++; + + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + if (!loaded.get()) { + database.restoreMessages(queue, loadCursor, 50, this); + } + return false; + } + + public void messagesRestored(Collection msgs) { + if (!msgs.isEmpty()) { + restoredMsgs.addAll(msgs); + } else { + loaded.set(true); + } + dispatchContext.requestDispatch(); + } + } +} Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java Mon Mar 30 16:20:28 2009 @@ -23,6 +23,8 @@ public interface DeliveryTarget { + public void deliver(MessageDelivery delivery, ISourceController source); + public IFlowSink getSink(); public boolean match(MessageDelivery message); Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java Mon Mar 30 16:20:28 2009 @@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.Connection; +import org.apache.activemq.broker.store.BrokerDatabase; +import org.apache.activemq.broker.store.Store; import org.apache.activemq.dispatch.IDispatcher; import org.apache.activemq.protobuf.AsciiBuffer; import org.apache.activemq.transport.DispatchableTransportServer; @@ -45,6 +47,7 @@ private String connectUri; private String name; private IDispatcher dispatcher; + private BrokerDatabase database; private final AtomicBoolean stopping = new AtomicBoolean(); public String getName() { @@ -62,12 +65,19 @@ for (VirtualHost virtualHost : virtualHosts.values()) { virtualHost.stop(); } + database.stop(); dispatcher.shutdown(); } public final void start() throws Exception { dispatcher.start(); + if (database != null) { + database.start(); + } else { + throw new Exception("Store not initialized"); + } + addVirtualHost(getDefaultVirtualHost()); for (VirtualHost virtualHost : virtualHosts.values()) { virtualHost.start(); @@ -134,12 +144,16 @@ } // ///////////////////////////////////////////////////////////////// - // Virtual Host Related Opperations + // Virtual Host Related Opperations // ///////////////////////////////////////////////////////////////// public VirtualHost getDefaultVirtualHost() { synchronized (virtualHosts) { - if( defaultVirtualHost==null ) { + if (defaultVirtualHost == null) { defaultVirtualHost = new VirtualHost(); + defaultVirtualHost.setDatabase(database); + ArrayList names = new ArrayList(1); + names.add(new AsciiBuffer("default")); + defaultVirtualHost.setHostNames(names); } return defaultVirtualHost; } @@ -174,6 +188,7 @@ setDefaultVirtualHost(host); } } + host.setDatabase(database); } public synchronized void removeVirtualHost(VirtualHost host) throws Exception { @@ -181,9 +196,10 @@ for (AsciiBuffer name : host.getHostNames()) { virtualHosts.remove(name); } - // Was the default virtual host removed? Set the default to the next virtual host. - if( host == defaultVirtualHost ) { - if( virtualHosts.isEmpty() ) { + // Was the default virtual host removed? Set the default to the next + // virtual host. + if (host == defaultVirtualHost) { + if (virtualHosts.isEmpty()) { defaultVirtualHost = null; } else { defaultVirtualHost = virtualHosts.values().iterator().next(); @@ -204,4 +220,8 @@ } } + public void setStore(Store store) { + database = new BrokerDatabase(store, dispatcher); + } + } \ No newline at end of file Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Mon Mar 30 16:20:28 2009 @@ -16,10 +16,11 @@ */ package org.apache.activemq.broker; +import java.io.IOException; + import org.apache.activemq.broker.store.Store; import org.apache.activemq.protobuf.AsciiBuffer; import org.apache.activemq.protobuf.Buffer; -import org.apache.activemq.queue.PersistentQueue; public interface MessageDelivery { @@ -53,13 +54,19 @@ */ public void onMessagePersisted(); - public Store.MessageRecord createMessageRecord(); + public Store.MessageRecord createMessageRecord() throws IOException; public Buffer getTransactionId(); - public void persist(PersistentQueue queue); + public void persist(AsciiBuffer queue, boolean delayable) throws IOException; - public void delete(PersistentQueue queue); + public void delete(AsciiBuffer queue); + + /** + * Sets the unique storage tracking number. + * @param tracking The tracking number. + */ + public void setStoreTracking(long tracking); /** * Gets the tracking number used to identify this message in the message Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java Mon Mar 30 16:20:28 2009 @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker; +import java.io.IOException; import java.util.HashMap; import org.apache.activemq.broker.DeliveryTarget; @@ -51,6 +52,10 @@ protected IQueue cratePartition(Integer partitionKey) { return createSharedFlowQueue(); } + + public boolean isElementPersistent(MessageDelivery elem) { + return elem.isPersistent(); + } }; queue.setPartitionMapper(partitionMapper); queue.setResourceName(destination.getName().toString()); @@ -74,6 +79,9 @@ SharedPriorityQueue queue = new SharedPriorityQueue(destination.getName().toString(), limiter); queue.setKeyMapper(keyExtractor); queue.setAutoRelease(true); + //DBQueueStore store = new DBQueueStore(broker.getDefaultVirtualHost().getDatabase(), queue, broker.getDispatcher()); + //store.setKeyMapper(keyExtractor); + //queue.setStore(store); queue.setDispatcher(broker.getDispatcher()); return queue; } else { @@ -81,13 +89,26 @@ SharedQueue queue = new SharedQueue(destination.getName().toString(), limiter); queue.setKeyMapper(keyExtractor); queue.setAutoRelease(true); + //DBQueueStore store = new DBQueueStore(broker.getDefaultVirtualHost().getDatabase(), queue, broker.getDispatcher()); + //store.setKeyMapper(keyExtractor); + //queue.setStore(store); queue.setDispatcher(broker.getDispatcher()); return queue; } } - public final void deliver(ISourceController source, MessageDelivery msg) { - queue.add(msg, source); + public final void deliver(MessageDelivery delivery, ISourceController source) { + try { + if(delivery.isPersistent()) + { + delivery.persist(destination.getName(), true); + } + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + queue.add(delivery, source); } public final Destination getDestination() { @@ -180,5 +201,4 @@ public boolean isDurable() { return true; } - } \ No newline at end of file Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java Mon Mar 30 16:20:28 2009 @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker; +import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -26,12 +27,9 @@ import org.apache.activemq.broker.MessageDelivery; import org.apache.activemq.broker.QueueDomain; import org.apache.activemq.broker.TopicDomain; -import org.apache.activemq.broker.store.Store.Callback; -import org.apache.activemq.broker.store.Store.Session; -import org.apache.activemq.broker.store.Store.VoidCallback; +import org.apache.activemq.broker.store.BrokerDatabase; import org.apache.activemq.flow.ISourceController; import org.apache.activemq.protobuf.AsciiBuffer; -import org.apache.activemq.protobuf.Buffer; final public class Router { @@ -40,6 +38,7 @@ private final HashMap domains = new HashMap(); private VirtualHost virtualHost; + private BrokerDatabase database; public Router() { domains.put(QUEUE_DOMAIN, new QueueDomain()); @@ -90,8 +89,9 @@ // Collection targets = route(msg.getDestination(), msg); - msg.store = getVirtualHost().getDatabase(); - + msg.store = database; + msg.setStoreTracking(msg.store.allocateStoreTracking()); + // TODO: // Consider doing some caching of this target list. Most producers // always send to the same destination. @@ -108,15 +108,26 @@ } } - //The sinks will request persistence via MessageDelivery.persist() - //if they require persistence: + // The sinks will request persistence via MessageDelivery.persist() + // if they require persistence: for (DeliveryTarget dt : targets) { - if (dt.match(msg)) { - dt.getSink().add(msg, controller); - } + dt.deliver(msg, controller); + //if (dt.match(msg)) { + // + // dt.getSink().add(msg, controller); + //} } - msg.persistIfNeeded(controller); + try { + msg.persistIfNeeded(controller); + } catch (IOException ioe) { + //TODO: Error serializing the message, this should trigger an error + //This is a pretty severe error as we've already delivered + //the message to the recipients. If we send an error response + //back it could result in a duplicate. Does this mean that we + //should persist the message prior to sending to the recips? + ioe.printStackTrace(); + } } else { // Let the client know we got the message even though there @@ -147,10 +158,15 @@ public void setVirtualHost(VirtualHost virtualHost) { this.virtualHost = virtualHost; + this.database = virtualHost.getDatabase(); } public VirtualHost getVirtualHost() { return virtualHost; } + public void setDatabase(BrokerDatabase database) { + this.database = database; + } + } \ No newline at end of file Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java Mon Mar 30 16:20:28 2009 @@ -31,10 +31,11 @@ final private HashMap queues = new HashMap(); private ArrayList hostNames = new ArrayList(); private Router router; - private BrokerDatabase database = new BrokerDatabase(); + private BrokerDatabase database; public VirtualHost() { - setRouter(new Router()); + this.router = new Router(); + this.router.setVirtualHost(this); } public AsciiBuffer getHostName() { @@ -54,10 +55,6 @@ public Router getRouter() { return router; } - public void setRouter(Router router) { - this.router = router; - this.router.setVirtualHost(this); - } public void start() throws Exception { for (Queue queue : queues.values()) { @@ -81,6 +78,7 @@ public void setDatabase(BrokerDatabase store) { this.database = store; + router.setDatabase(database); } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Mon Mar 30 16:20:28 2009 @@ -16,12 +16,16 @@ */ package org.apache.activemq.broker.openwire; +import java.io.IOException; + import org.apache.activemq.broker.BrokerMessageDelivery; import org.apache.activemq.broker.Destination; import org.apache.activemq.broker.store.Store.MessageRecord; import org.apache.activemq.command.Message; +import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.protobuf.AsciiBuffer; import org.apache.activemq.protobuf.Buffer; +import org.apache.activemq.util.ByteSequence; public class OpenWireMessageDelivery extends BrokerMessageDelivery { @@ -30,6 +34,7 @@ private final Message message; private Destination destination; private AsciiBuffer producerId; + private OpenWireFormat storeWireFormat; private PersistListener persistListener = null; public interface PersistListener { @@ -60,7 +65,7 @@ } public AsciiBuffer getMsgId() { - return null; + return new AsciiBuffer(message.getMessageId().toString()); } public AsciiBuffer getProducerId() { @@ -100,13 +105,12 @@ return message.isResponseRequired(); } - - public MessageRecord createMessageRecord() { + public MessageRecord createMessageRecord() throws IOException { MessageRecord record = new MessageRecord(); record.setEncoding(ENCODING); - // TODO: Serialize it.. - // record.setBuffer() - // record.setStreamKey(stream); + ByteSequence bytes = storeWireFormat.marshal(message); + record.setBuffer(new Buffer(bytes.getData(), bytes.getOffset(), bytes.getLength())); + record.setStreamKey((long) 0); record.setMessageId(getMsgId()); return record; } @@ -116,4 +120,7 @@ return null; } + public void setStoreWireFormat(OpenWireFormat wireFormat) { + this.storeWireFormat = wireFormat; + } } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Mon Mar 30 16:20:28 2009 @@ -16,8 +16,10 @@ */ package org.apache.activemq.broker.openwire; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; @@ -51,6 +53,7 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerId; @@ -66,21 +69,25 @@ import org.apache.activemq.filter.LogicExpression; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.NoLocalExpression; +import org.apache.activemq.flow.AbstractLimitedFlowResource; import org.apache.activemq.flow.Flow; import org.apache.activemq.flow.FlowController; import org.apache.activemq.flow.IFlowController; import org.apache.activemq.flow.IFlowDrain; +import org.apache.activemq.flow.IFlowLimiter; +import org.apache.activemq.flow.IFlowResource; import org.apache.activemq.flow.IFlowSink; -import org.apache.activemq.flow.IFlowSource; import org.apache.activemq.flow.ISourceController; import org.apache.activemq.flow.SizeLimiter; import org.apache.activemq.flow.ISinkController.FlowControllable; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.protobuf.AsciiBuffer; +import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.queue.SingleFlowRelay; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.transport.WireFormatNegotiator; +import org.apache.activemq.util.ByteSequence; import org.apache.activemq.wireformat.WireFormat; public class OpenwireProtocolHandler implements ProtocolHandler, PersistListener { @@ -88,30 +95,13 @@ protected final HashMap producers = new HashMap(); protected final HashMap consumers = new HashMap(); - protected final Object inboundMutex = new Object(); - protected IFlowController inboundController; - protected BrokerConnection connection; private OpenWireFormat wireFormat; + private OpenWireFormat storeWireFormat; private Router router; public void start() throws Exception { - // Setup the inbound processing.. - final Flow flow = new Flow("broker-" + connection.getName() + "-inbound", false); - SizeLimiter limiter = new SizeLimiter(connection.getInputWindowSize(), connection.getInputResumeThreshold()); - inboundController = new FlowController(new FlowControllableAdapter() { - public void flowElemAccepted(ISourceController controller, OpenWireMessageDelivery elem) { - if (elem.isResponseRequired()) { - elem.setPersistListener(OpenwireProtocolHandler.this); - } - router.route(elem, controller); - controller.elementDispatched(elem); - } - public String toString() { - return flow.getFlowName(); - } - }, flow, limiter, inboundMutex); } public void stop() throws Exception { @@ -172,6 +162,7 @@ ProducerContext producerContext = producers.get(producerId); OpenWireMessageDelivery md = new OpenWireMessageDelivery(info); + md.setStoreWireFormat(storeWireFormat); // Only producers that are not using a window will block, // and if it blocks. @@ -346,11 +337,10 @@ }.start(); } } - public void onMessagePersisted(OpenWireMessageDelivery delivery) { // TODO This method should not block: - // Either add to output queue, or spin off in a separate thread. + // Either add to output queue, or spin off in a separate thread. ack(delivery.getMessage()); } @@ -367,52 +357,44 @@ // Internal Support Methods // ///////////////////////////////////////////////////////////////// - static class FlowControllableAdapter implements FlowControllable { - public void flowElemAccepted(ISourceController controller, OpenWireMessageDelivery elem) { - } - - public IFlowSink getFlowSink() { - return null; - } - - public IFlowSource getFlowSource() { - return null; - } - } - - class ProducerContext { + class ProducerContext extends AbstractLimitedFlowResource { + protected final Object inboundMutex = new Object(); private IFlowController controller; private String name; public ProducerContext(final ProducerInfo info) { - this.name = info.getProducerId().toString(); + super(info.getProducerId().toString()); + final Flow flow = new Flow("broker-" + name + "-inbound", false); // Openwire only uses credit windows at the producer level for // producers that request the feature. + IFlowLimiter limiter; if (info.getWindowSize() > 0) { - final Flow flow = new Flow("broker-" + name + "-inbound", false); - WindowLimiter limiter = new WindowLimiter(false, flow, info.getWindowSize(), info.getWindowSize() / 2) { + limiter = new WindowLimiter(false, flow, info.getWindowSize(), info.getWindowSize() / 2) { @Override protected void sendCredit(int credit) { ProducerAck ack = new ProducerAck(info.getProducerId(), credit); connection.write(ack); } }; - - controller = new FlowController(new FlowControllableAdapter() { - public void flowElemAccepted(ISourceController controller, OpenWireMessageDelivery msg) { - router.route(msg, controller); - controller.elementDispatched(msg); - } - - public String toString() { - return flow.getFlowName(); - } - }, flow, limiter, inboundMutex); } else { - controller = inboundController; + + limiter = new SizeLimiter(connection.getInputWindowSize(), connection.getInputResumeThreshold()); } + + controller = new FlowController(new FlowControllable() { + public void flowElemAccepted(ISourceController controller, OpenWireMessageDelivery msg) { + router.route(msg, controller); + controller.elementDispatched(msg); + } + + public IFlowResource getFlowResource() { + return ProducerContext.this; + } + }, flow, limiter, inboundMutex); + + super.onFlowOpened(controller); } } @@ -422,13 +404,27 @@ private String name; private BooleanExpression selector; private boolean durable; + private AsciiBuffer durableQueueName; private SingleFlowRelay queue; public WindowLimiter limiter; + HashMap pendingMessages = new HashMap(); + LinkedList pendingMessageIds = new LinkedList(); + public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException { this.info = info; this.name = info.getConsumerId().toString(); + durable = info.isDurable(); + if (durable) { + durableQueueName = new AsciiBuffer(info.getSubscriptionName()); + try { + connection.getBroker().getDefaultVirtualHost().getDatabase().addQueue(durableQueueName); + } catch (Throwable thrown) { + thrown.printStackTrace(); + } + } + selector = parseSelector(info); Flow flow = new Flow("broker-" + name + "-outbound", false); @@ -445,6 +441,13 @@ md.setConsumerId(info.getConsumerId()); md.setMessage(msg); md.setDestination(msg.getDestination()); + // Add to the pending list if persistent and we are durable: + if (isDurable() && message.isPersistent()) { + synchronized (queue) { + pendingMessages.put(msg.getMessageId(), message); + pendingMessageIds.add(msg.getMessageId()); + } + } connection.write(md); }; }); @@ -452,6 +455,19 @@ public void ack(MessageAck info) { synchronized (queue) { + if (isDurable()) { + MessageId id = info.getLastMessageId(); + while (!pendingMessageIds.isEmpty()) { + MessageId pendingId = pendingMessageIds.peekFirst(); + MessageDelivery delivery = pendingMessages.remove(pendingId); + delivery.delete(durableQueueName); + pendingMessageIds.removeFirst(); + if (pendingId.equals(id)) { + break; + } + } + + } limiter.onProtocolCredit(info.getMessageCount()); } } @@ -460,6 +476,23 @@ return queue; } + public final void deliver(MessageDelivery delivery, ISourceController source) { + if (!match(delivery)) { + return; + } + + if (isDurable() && delivery.isPersistent()) { + try { + delivery.persist(durableQueueName, true); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + queue.add(delivery, source); + } + public boolean match(MessageDelivery message) { Message msg = message.asType(Message.class); if (msg == null) { @@ -541,9 +574,17 @@ public void setWireFormat(WireFormat wireFormat) { this.wireFormat = (OpenWireFormat) wireFormat; + this.storeWireFormat = this.wireFormat.copy(); + storeWireFormat.setCacheEnabled(false); + storeWireFormat.setTightEncodingEnabled(false); + storeWireFormat.setSizePrefixDisabled(false); } - public MessageDelivery createMessageDelivery(MessageRecord record) { - throw new UnsupportedOperationException(); + public MessageDelivery createMessageDelivery(MessageRecord record) throws IOException { + Buffer buf = record.getBuffer(); + Message message = (Message) storeWireFormat.unmarshal(new ByteSequence(buf.data, buf.offset, buf.length)); + OpenWireMessageDelivery delivery = new OpenWireMessageDelivery(message); + delivery.setStoreWireFormat(storeWireFormat); + return delivery; } } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java Mon Mar 30 16:20:28 2009 @@ -16,6 +16,8 @@ */ package org.apache.activemq.broker.protocol; +import java.io.IOException; + import org.apache.activemq.Service; import org.apache.activemq.broker.BrokerConnection; import org.apache.activemq.broker.MessageDelivery; @@ -29,5 +31,6 @@ public void onException(Exception error); public void setWireFormat(WireFormat wf); - public MessageDelivery createMessageDelivery(MessageRecord record); + public MessageDelivery createMessageDelivery(MessageRecord record) throws IOException; + } \ No newline at end of file Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Mon Mar 30 16:20:28 2009 @@ -38,12 +38,13 @@ import org.apache.activemq.broker.store.Store.MessageRecord; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.filter.BooleanExpression; +import org.apache.activemq.flow.AbstractLimitedFlowResource; import org.apache.activemq.flow.Flow; import org.apache.activemq.flow.FlowController; import org.apache.activemq.flow.IFlowController; import org.apache.activemq.flow.IFlowDrain; +import org.apache.activemq.flow.IFlowResource; import org.apache.activemq.flow.IFlowSink; -import org.apache.activemq.flow.IFlowSource; import org.apache.activemq.flow.ISourceController; import org.apache.activemq.flow.SizeLimiter; import org.apache.activemq.flow.ISinkController.FlowControllable; @@ -63,12 +64,11 @@ public void onStompFrame(StompFrame frame) throws Exception; } + private InboundContext inboundContext; + protected final HashMap actionHandlers = new HashMap(); protected final HashMap consumers = new HashMap(); - protected final Object inboundMutex = new Object(); - protected IFlowController inboundController; - protected BrokerConnection connection; // TODO: need to update the FrameTranslator to normalize to new broker API @@ -99,15 +99,14 @@ } }); actionHandlers.put(Stomp.Commands.SEND, new ActionHander() { + public void onStompFrame(StompFrame frame) throws Exception { String dest = frame.getHeaders().get(Stomp.Headers.Send.DESTINATION); Destination destination = translator(frame).convertToDestination(StompProtocolHandler.this, dest); frame.setAction(Stomp.Responses.MESSAGE); StompMessageDelivery md = new StompMessageDelivery(frame, destination); - while (!inboundController.offer(md, null)) { - inboundController.waitForFlowUnblock(); - } + inboundContext.onReceive(md); } }); actionHandlers.put(Stomp.Commands.SUBSCRIBE, new ActionHander() { @@ -147,22 +146,7 @@ } public void start() throws Exception { - // Setup the inbound processing.. - final Flow inboundFlow = new Flow("broker-" + connection.getName() + "-inbound", false); - SizeLimiter inLimiter = new SizeLimiter(connection.getInputWindowSize(), connection.getInputResumeThreshold()); - inboundController = new FlowController(new FlowControllableAdapter() { - public void flowElemAccepted(ISourceController controller, StompMessageDelivery elem) { - if (elem.isResponseRequired()) { - elem.setPersistListener(StompProtocolHandler.this); - } - router.route(elem, controller); - controller.elementDispatched(elem); - } - - public String toString() { - return inboundFlow.getFlowName(); - } - }, inboundFlow, inLimiter, inboundMutex); + inboundContext = new InboundContext(); Flow outboundFlow = new Flow("broker-" + connection.getName() + "-outbound", false); SizeLimiter outLimiter = new SizeLimiter(connection.getOutputWindowSize(), connection.getOutputWindowSize()); @@ -240,16 +224,40 @@ // ///////////////////////////////////////////////////////////////// // Internal Support Methods // ///////////////////////////////////////////////////////////////// - static class FlowControllableAdapter implements FlowControllable { - public void flowElemAccepted(ISourceController controller, StompMessageDelivery elem) { - } - public IFlowSink getFlowSink() { - return null; + class InboundContext extends AbstractLimitedFlowResource { + protected final Object inboundMutex = new Object(); + protected IFlowController inboundController; + + InboundContext() { + super("broker-" + connection.getName() + "-inbound"); + // Setup the inbound processing.. + final Flow inboundFlow = new Flow(getResourceName(), false); + SizeLimiter inLimiter = new SizeLimiter(connection.getInputWindowSize(), connection.getInputResumeThreshold()); + inboundController = new FlowController(new FlowControllable() { + public void flowElemAccepted(ISourceController controller, StompMessageDelivery elem) { + if (elem.isResponseRequired()) { + elem.setPersistListener(StompProtocolHandler.this); + } + router.route(elem, controller); + controller.elementDispatched(elem); + } + + public String toString() { + return inboundFlow.getFlowName(); + } + + public IFlowResource getFlowResource() { + return InboundContext.this; + } + }, inboundFlow, inLimiter, inboundMutex); + super.onFlowOpened(inboundController); } - public IFlowSource getFlowSource() { - return null; + public void onReceive(StompMessageDelivery md) throws InterruptedException { + while (!inboundController.offer(md, null)) { + inboundController.waitForFlowUnblock(); + } } } @@ -268,6 +276,7 @@ private LinkedHashMap sentMessageIds = new LinkedHashMap(); private boolean durable; + private AsciiBuffer durableQueueName; public ConsumerContext(final StompFrame subscribe) throws Exception { translator = translator(subscribe); @@ -378,6 +387,24 @@ // } } + public void deliver(MessageDelivery delivery, ISourceController source) { + if (!match(delivery)) { + return; + } + + if (isDurable() && delivery.isPersistent()) { + try { + delivery.persist(durableQueueName, true); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + queue.add(delivery, source); + + } + public boolean isDurable() { return durable; } @@ -407,9 +434,10 @@ connection.write(errorMessage); } - //Callback from MessageDelivery when message's persistence guarantees are met. + // Callback from MessageDelivery when message's persistence guarantees are + // met. public void onMessagePersisted(StompMessageDelivery delivery) { - //TODO this method must not block: + // TODO this method must not block: ack(delivery.getStomeFame()); } @@ -483,7 +511,7 @@ // TODO Auto-generated method stub return null; } - + public MessageDelivery createMessageDelivery(MessageRecord record) { throw new UnsupportedOperationException(); }