From commits-return-10709-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Mon Apr 27 18:41:33 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 5266 invoked from network); 27 Apr 2009 18:41:33 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 27 Apr 2009 18:41:33 -0000 Received: (qmail 54096 invoked by uid 500); 27 Apr 2009 18:41:33 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 54038 invoked by uid 500); 27 Apr 2009 18:41:33 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 54028 invoked by uid 99); 27 Apr 2009 18:41:33 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Apr 2009 18:41:33 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Apr 2009 18:41:17 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 150EE23889DE; Mon, 27 Apr 2009 18:40:56 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r769099 [1/5] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/protocol/ main/java/org/apache/activemq/broker/stomp/ main/jav... Date: Mon, 27 Apr 2009 18:40:49 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090427184056.150EE23889DE@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Mon Apr 27 18:40:44 2009 New Revision: 769099 URL: http://svn.apache.org/viewvc?rev=769099&view=rev Log: Applying colins patch for https://issues.apache.org/activemq/browse/AMQ-2230 Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Store.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/TreeMemoryStore.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java?rev=769099&r1=769098&r2=769099&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java Mon Apr 27 18:40:44 2009 @@ -17,129 +17,173 @@ package org.apache.activemq.broker; import java.io.IOException; -import java.util.Collection; -import java.util.HashSet; +import java.util.HashMap; +import java.util.Set; +import java.util.Map.Entry; import org.apache.activemq.broker.store.BrokerDatabase; import org.apache.activemq.broker.store.BrokerDatabase.OperationContext; +import org.apache.activemq.broker.store.Store.MessageRecord; import org.apache.activemq.flow.ISourceController; -import org.apache.activemq.protobuf.AsciiBuffer; +import org.apache.activemq.queue.QueueStore; +import org.apache.activemq.queue.QueueStore.QueueDescriptor; public abstract class BrokerMessageDelivery implements MessageDelivery { - HashSet persistentTargets; - // Indicates whether or not the message has been saved to the - // database, if not then in memory updates can be done. - boolean saved = false; + // True while the message is being dispatched to the delivery targets: + boolean dispatching = false; + + // A non null pending save indicates that the message is the + // saver queue and that the message + OperationContext pendingSave; + + // List of persistent targets for which the message should be saved + // when dispatch is complete: + HashMap persistentTargets; + long storeTracking = -1; BrokerDatabase store; boolean fromStore = false; boolean enableFlushDelay = true; - OperationContext saveContext; - boolean cancelled = false; + private int limiterSize = -1; - public void setFromStore(boolean val) { + public void setFromDatabase(BrokerDatabase database, MessageRecord mRecord) { fromStore = true; + store = database; + storeTracking = mRecord.getKey(); + limiterSize = mRecord.getSize(); + } + + public final int getFlowLimiterSize() { + if (limiterSize == -1) { + limiterSize = getMemorySize(); + } + return limiterSize; } + /** + * Subclass must implement this to return their current memory size + * estimate. + * + * @return The memory size of the message. + */ + public abstract int getMemorySize(); + public final boolean isFromStore() { return fromStore; } - public final void persist(AsciiBuffer queue, boolean delayable) throws IOException { - + public final void persist(QueueStore.QueueDescriptor queue, ISourceController controller, long queueSequence, boolean delayable) throws IOException { synchronized (this) { - if (!saved) { + // Can flush of this message to the store be delayed? + if (enableFlushDelay && !delayable) { + enableFlushDelay = false; + } + // If this message is being dispatched then add the queue to the + // list of queues for which to save the message when dispatch is + // finished: + if (dispatching) { if (persistentTargets == null) { - persistentTargets = new HashSet(); + persistentTargets = new HashMap(); } - persistentTargets.add(queue); + persistentTargets.put(queue, queueSequence); return; } - if (!delayable) { - enableFlushDelay = false; + // Otherwise, if it is still in the saver queue, we can add this + // queue to the queue list: + else if (pendingSave != null) { + persistentTargets.put(queue, queueSequence); + if (!delayable) { + pendingSave.requestFlush(); + } + return; } } - // TODO probably need to pass in the saving queue's source controller - // here and treat it like it is dispatching to the saver queue. - store.saveMessage(this, queue, null); + store.saveMessage(this, queue, queueSequence, controller); } - public final void delete(AsciiBuffer queue) { + public final void acknowledge(QueueStore.QueueDescriptor queue) { boolean firePersistListener = false; + boolean deleted = false; synchronized (this) { - if (!saved) { - persistentTargets.remove(queue); - if (persistentTargets.isEmpty()) { - if (saveContext != null) { + // If the message hasn't been saved to the database + // then we don't need to issue a delete: + if (dispatching || pendingSave != null) { - if (!cancelled) { - if (saveContext.cancel()) { - cancelled = true; - firePersistListener = true; - } + // Remove the queue: + persistentTargets.remove(queue); + deleted = true; - saved = true; + // We get a save context when we place the message in the + // database queue. If it has been added to the queue, + // and we've removed the last queue, see if we can cancel + // the save: + if (pendingSave != null && persistentTargets.isEmpty()) { + if (pendingSave.cancel()) { + pendingSave = null; + if (isPersistent()) { + firePersistListener = true; } } } - } else { - store.deleteMessage(this, queue); } } + if (!deleted) { + store.deleteMessage(this, queue); + } + if (firePersistListener) { onMessagePersisted(); } } - public void setStoreTracking(long storeTracking) { - this.storeTracking = storeTracking; + public void beginDispatch(BrokerDatabase database) { + this.store = database; + dispatching = true; + if (storeTracking == -1) { + storeTracking = database.allocateStoreTracking(); + } } public long getStoreTracking() { return storeTracking; } - public Collection getPersistentQueues() { - return persistentTargets; + public Set> getPersistentQueues() { + return persistentTargets.entrySet(); } public void beginStore() { synchronized (this) { - saved = true; + pendingSave = null; } } - public void persistIfNeeded(ISourceController controller) throws IOException { + public void finishDispatch(ISourceController controller) throws IOException { boolean firePersistListener = false; synchronized (this) { - boolean saveNeeded = true; - if (persistentTargets == null || persistentTargets.isEmpty()) { - saveNeeded = false; - saved = true; - } - // If any of the targets requested save then save the message // Note that this could be the case even if the message isn't // persistent if a target requested that the message be spooled // for some other reason such as queue memory overflow. - if (saveNeeded) { - saveContext = store.persistReceivedMessage(this, controller); + if (persistentTargets != null && !persistentTargets.isEmpty()) { + pendingSave = store.persistReceivedMessage(this, controller); } + // If none of the targets required persistence, then fire the // persist listener: - else if (isResponseRequired() && isPersistent()) { + if (pendingSave == null || !isPersistent()) { firePersistListener = true; } + dispatching = false; } if (firePersistListener) { onMessagePersisted(); } - } public boolean isFlushDelayable() { Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java?rev=769099&r1=769098&r2=769099&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java Mon Apr 27 18:40:44 2009 @@ -1,260 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker; - -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.activemq.broker.store.BrokerDatabase; -import org.apache.activemq.broker.store.BrokerDatabase.MessageRestoreListener; -import org.apache.activemq.broker.store.BrokerDatabase.RestoredMessage; -import org.apache.activemq.dispatch.IDispatcher; -import org.apache.activemq.dispatch.IDispatcher.DispatchContext; -import org.apache.activemq.dispatch.IDispatcher.Dispatchable; -import org.apache.activemq.protobuf.AsciiBuffer; -import org.apache.activemq.queue.Mapper; -import org.apache.activemq.queue.Store; -import org.apache.activemq.queue.Subscription; - -public class DBQueueStore implements Store { - - private final BrokerDatabase database; - private final AsciiBuffer queue; - private final MessageRetriever retriever; - - private long firstKey = -1; - private long lastKey = -1; - - private int count = 0; - private boolean loading = true; - - protected HashMap map = new HashMap(); - protected TreeMap order = new TreeMap(); - private Mapper keyExtractor; - - DBQueueStore(BrokerDatabase database, AsciiBuffer queue, IDispatcher dispatcher) { - this.database = database; - this.queue = queue; - retriever = new MessageRetriever(dispatcher); - retriever.start(); - } - - public StoreNode add(K key, MessageDelivery delivery) { - - // New to this queue? - if (delivery.getStoreTracking() > lastKey) { - return addInternal(key, delivery); - } else { - throw new IllegalArgumentException(this + " Duplicate key: " + delivery); - } - } - - public void setKeyMapper(Mapper keyExtractor) { - this.keyExtractor = keyExtractor; - } - - private DBStoreNode addInternal(K key, MessageDelivery delivery) { - DBStoreNode node = new DBStoreNode(delivery); - map.put(keyExtractor.map(delivery), node); - order.put(delivery.getStoreTracking(), node); - return node; - } - - public boolean isEmpty() { - return count == 0; - } - - public StoreCursor openCursor() { - return new DBStoreCursor(); - } - - public StoreCursor openCursorAt(StoreNode next) { - DBStoreCursor cursor = new DBStoreCursor(); - cursor.next = (DBStoreNode) next; - return cursor; - } - - public StoreNode remove(K key) { - // TODO Auto-generated method stub - return null; - } - - public int size() { - return count; - } - - private class DBStoreCursor implements StoreCursor { - private long pos; - private long last = -1; - - private DBStoreNode node; - private DBStoreNode next; - - public StoreNode peekNext() { - // TODO Auto-generated method stub - return null; - } - - public void setNext(StoreNode node) { - this.next = (DBStoreNode) next; - - } - - public boolean hasNext() { - if (next != null) - return true; - - SortedMap m = order.tailMap(last + 1); - if (m.isEmpty()) { - next = null; - } else { - next = m.get(m.firstKey()); - } - return next != null; - } - - public StoreNode next() { - try { - hasNext(); - return next; - } finally { - last = next.tracking; - next = null; - } - } - - public boolean isReady() { - return !loading; - } - - public void remove() { - database.deleteMessage(node.delivery, queue); - } - } - - private class DBStoreNode implements StoreNode { - private MessageDelivery delivery; - private K key; - private long ownerId = -1; - private final long tracking; - - DBStoreNode(MessageDelivery delivery) { - this.delivery = delivery; - tracking = delivery.getStoreTracking(); - key = keyExtractor.map(delivery); - retriever.save(this); - } - - public boolean acquire(Subscription owner) { - long id = owner.getSink().getResourceId(); - // TODO Auto-generated method stub - if (ownerId == -1 || id == ownerId) { - ownerId = owner.getSink().getResourceId(); - return true; - } - return false; - } - - public K getKey() { - return key; - } - - public MessageDelivery getValue() { - return delivery; - } - - public void unacquire() { - ownerId = -1; - } - } - - private class MessageRetriever implements Dispatchable, MessageRestoreListener { - - private final DispatchContext dispatchContext; - private AtomicBoolean loaded = new AtomicBoolean(false); - - private long loadCursor = 0; - private long max = -1; - private long loadedCount; - - private final ConcurrentLinkedQueue restoredMsgs = new ConcurrentLinkedQueue(); - - MessageRetriever(IDispatcher dispatcher) { - dispatchContext = dispatcher.register(this, "MessageRetriever-" + queue); - } - - public void save(DBStoreNode node) { - try { - node.delivery.persist(queue, false); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - public void start() { - if (!loaded.get()) { - database.restoreMessages(queue, loadCursor, 50, this); - } - } - - public boolean dispatch() { - while (true) { - RestoredMessage restored = restoredMsgs.poll(); - - if (restored == null) { - break; - } - - try { - MessageDelivery delivery = restored.getMessageDelivery(); - addInternal(keyExtractor.map(delivery), delivery); - if (firstKey == -1) { - firstKey = delivery.getStoreTracking(); - } - if (lastKey < delivery.getStoreTracking()) { - lastKey = delivery.getStoreTracking(); - } - loadedCount++; - - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - if (!loaded.get()) { - database.restoreMessages(queue, loadCursor, 50, this); - } - return false; - } - - public void messagesRestored(Collection msgs) { - if (!msgs.isEmpty()) { - restoredMsgs.addAll(msgs); - } else { - loaded.set(true); - } - dispatchContext.requestDispatch(); - } - } -} Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java?rev=769099&r1=769098&r2=769099&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java Mon Apr 27 18:40:44 2009 @@ -19,7 +19,6 @@ import org.apache.activemq.broker.MessageDelivery; import org.apache.activemq.flow.IFlowSink; import org.apache.activemq.flow.ISourceController; -import org.apache.activemq.protobuf.AsciiBuffer; public interface DeliveryTarget { @@ -27,9 +26,9 @@ public IFlowSink getSink(); + public boolean hasSelector(); + public boolean match(MessageDelivery message); public boolean isDurable(); - - public AsciiBuffer getPersistentQueueName(); } \ No newline at end of file Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java?rev=769099&r1=769098&r2=769099&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java Mon Apr 27 18:40:44 2009 @@ -48,6 +48,7 @@ private String name; private IDispatcher dispatcher; private BrokerDatabase database; + private final AtomicBoolean stopping = new AtomicBoolean(); public String getName() { @@ -123,6 +124,14 @@ this.dispatcher = dispatcher; } + public BrokerDatabase getDatabase() { + return database; + } + + public void setDatabase(BrokerDatabase database) { + this.database = database; + } + public String getBindUri() { return bindUri; } @@ -149,8 +158,7 @@ public VirtualHost getDefaultVirtualHost() { synchronized (virtualHosts) { if (defaultVirtualHost == null) { - defaultVirtualHost = new VirtualHost(); - defaultVirtualHost.setDatabase(database); + defaultVirtualHost = new VirtualHost(this); ArrayList names = new ArrayList(1); names.add(new AsciiBuffer("default")); defaultVirtualHost.setHostNames(names); @@ -188,7 +196,6 @@ setDefaultVirtualHost(host); } } - host.setDatabase(database); } public synchronized void removeVirtualHost(VirtualHost host) throws Exception { Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=769099&r1=769098&r2=769099&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Mon Apr 27 18:40:44 2009 @@ -19,8 +19,10 @@ import java.io.IOException; import org.apache.activemq.broker.store.Store; +import org.apache.activemq.flow.ISourceController; import org.apache.activemq.protobuf.AsciiBuffer; import org.apache.activemq.protobuf.Buffer; +import org.apache.activemq.queue.QueueStore; public interface MessageDelivery { @@ -50,7 +52,7 @@ /** * Called when the message's persistence requirements have been met. This - * method must not block. + * method must not restoreBlock. */ public void onMessagePersisted(); @@ -58,16 +60,31 @@ public Buffer getTransactionId(); - public void persist(AsciiBuffer queue, boolean delayable) throws IOException; + /** + * Asynchronously persists a message in the store. + * + * @param queue + * The queue against which to save the message. + * @param controller + * The source of the message. + * @param sequenceNumber + * The sequence number of the message in the queue + * @param delayable + * Can be set to indicate that flush of the message can be + * delayed in the hopes that an acknowledgement will negate the + * need for a delete + * @throws IOException If there is an exception serializing the message. + */ + public void persist(QueueStore.QueueDescriptor queue, ISourceController controller, long sequenceNumber, boolean delayable) throws IOException; - public void delete(AsciiBuffer queue); - /** - * Sets the unique storage tracking number. - * @param tracking The tracking number. + * Acknowledges the message for a particular queue. This will cause it to be + * deleted from the message store. + * + * @param queue The queue for which to acknowledge the message. */ - public void setStoreTracking(long tracking); - + public void acknowledge(QueueStore.QueueDescriptor queue); + /** * Gets the tracking number used to identify this message in the message * store. Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=769099&r1=769098&r2=769099&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java Mon Apr 27 18:40:44 2009 @@ -24,122 +24,36 @@ import org.apache.activemq.broker.MessageDelivery; import org.apache.activemq.flow.IFlowSink; import org.apache.activemq.flow.ISourceController; -import org.apache.activemq.flow.PrioritySizeLimiter; -import org.apache.activemq.flow.SizeLimiter; -import org.apache.activemq.protobuf.AsciiBuffer; import org.apache.activemq.queue.IQueue; -import org.apache.activemq.queue.Mapper; -import org.apache.activemq.queue.PartitionedQueue; -import org.apache.activemq.queue.SharedPriorityQueue; -import org.apache.activemq.queue.SharedQueue; +import org.apache.activemq.queue.QueueStore; import org.apache.activemq.queue.Subscription; +import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback; public class Queue implements DeliveryTarget { HashMap> subs = new HashMap>(); private Destination destination; - private IQueue queue; - private MessageBroker broker; - - private Mapper partitionMapper; - private Mapper keyExtractor; - - private IQueue createQueue() { + private IQueue queue; + private VirtualHost virtualHost; - if (partitionMapper!=null) { - PartitionedQueue queue = new PartitionedQueue() { - @Override - protected IQueue cratePartition(Integer partitionKey) { - return createSharedFlowQueue(); - } - - public boolean isElementPersistent(MessageDelivery elem) { - return elem.isPersistent(); - } - }; - queue.setPartitionMapper(partitionMapper); - queue.setResourceName(destination.getName().toString()); - return queue; - } else { - return createSharedFlowQueue(); - } + Queue(IQueue queue) + { + this.queue = queue; } - - - public static final Mapper PRIORITY_MAPPER = new Mapper() { - public Integer map(MessageDelivery element) { - return element.getPriority(); - } - }; - private IQueue createSharedFlowQueue() { - if (MessageBroker.MAX_PRIORITY > 1) { - PrioritySizeLimiter limiter = new PrioritySizeLimiter(100, 1, MessageBroker.MAX_PRIORITY); - limiter.setPriorityMapper(PRIORITY_MAPPER); - SharedPriorityQueue queue = new SharedPriorityQueue(destination.getName().toString(), limiter); - queue.setKeyMapper(keyExtractor); - queue.setAutoRelease(true); - //DBQueueStore store = new DBQueueStore(broker.getDefaultVirtualHost().getDatabase(), queue, broker.getDispatcher()); - //store.setKeyMapper(keyExtractor); - //queue.setStore(store); - queue.setDispatcher(broker.getDispatcher()); - return queue; - } else { - SizeLimiter limiter = new SizeLimiter(100, 1); - SharedQueue queue = new SharedQueue(destination.getName().toString(), limiter); - queue.setKeyMapper(keyExtractor); - queue.setAutoRelease(true); - //DBQueueStore store = new DBQueueStore(broker.getDefaultVirtualHost().getDatabase(), queue, broker.getDispatcher()); - //store.setKeyMapper(keyExtractor); - //queue.setStore(store); - queue.setDispatcher(broker.getDispatcher()); - return queue; - } - } - public final void deliver(MessageDelivery delivery, ISourceController source) { - try { - if(delivery.isPersistent()) - { - delivery.persist(destination.getName(), true); - } - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - queue.add(delivery, source); } - - public final Destination getDestination() { - return destination; - } public final void addConsumer(final DeliveryTarget dt) { - Subscription sub = new Subscription() { - public boolean isPreAcquired() { - return true; - } - - public boolean matches(MessageDelivery message) { - return dt.match(message); - } - - public boolean isRemoveOnDispatch() { - return true; - } + Subscription sub = new QueueSubscription(dt); - public IFlowSink getSink() { - return dt.getSink(); - } - - @Override - public String toString() { - return getSink().toString(); - } - }; - subs.put(dt, sub); - queue.addSubscription(sub); + Subscription old = subs.put(dt, sub); + if (old == null) { + queue.addSubscription(sub); + } else { + subs.put(dt, old); + } } public boolean removeSubscirption(final DeliveryTarget dt) { @@ -151,54 +65,108 @@ } public void start() throws Exception { - queue = createQueue(); + queue.start(); } public void stop() throws Exception { + if (queue != null) { + queue.stop(); + } } public IFlowSink getSink() { return queue; } + public boolean hasSelector() { + return false; + } + public boolean match(MessageDelivery message) { return true; } - public MessageBroker getBroker() { - return broker; + public VirtualHost getBroker() { + return virtualHost; } - public void setBroker(MessageBroker broker) { - this.broker = broker; + public void setVirtualHost(VirtualHost virtualHost) { + this.virtualHost = virtualHost; } - public Mapper getPartitionMapper() { - return partitionMapper; + public void setDestination(Destination destination) { + this.destination = destination; } - public void setPartitionMapper(Mapper partitionMapper) { - this.partitionMapper = partitionMapper; + public final Destination getDestination() { + return destination; } - - public Mapper getKeyExtractor() { - return keyExtractor; + + public boolean isDurable() + { + return true; } - public void setKeyExtractor(Mapper keyExtractor) { - this.keyExtractor = keyExtractor; - } + public static class QueueSubscription implements Subscription { + final DeliveryTarget target; - public void setDestination(Destination destination) { - this.destination = destination; - } + public QueueSubscription(DeliveryTarget dt) { + this.target = dt; + } + + public boolean isPreAcquired() { + return true; + } + + public boolean matches(MessageDelivery message) { + return target.match(message); + } + + public boolean hasSelector() { + return target.hasSelector(); + } + + public boolean isRemoveOnDispatch() { + return false; + } - public AsciiBuffer getPersistentQueueName() { - // TODO Auto-generated method stub - return destination.getName(); + public IFlowSink getSink() { + return target.getSink(); + } + + @Override + public String toString() { + return target.getSink().toString(); + } + + public boolean offer(MessageDelivery elem, ISourceController controller, SubscriptionDeliveryCallback callback) { + return target.getSink().offer(new QueueDelivery(elem, callback), controller); + } + + public boolean isBrowser() { + return false; + } } - public boolean isDurable() { - return true; + private static class QueueDelivery extends MessageDeliveryWrapper { + private final SubscriptionDeliveryCallback callback; + + QueueDelivery(MessageDelivery delivery, SubscriptionDeliveryCallback callback) { + super(delivery); + this.callback = callback; + } + + @Override + public void persist(QueueStore.QueueDescriptor queue, ISourceController controller, long sequenceNumber, boolean delayable) throws IOException { + // We override this for queue deliveries as the sub needn't + // persist the message + } + + public void acknowledge(QueueStore.QueueDescriptor queue) { + if (callback != null) { + callback.acknowledge(); + } + } + } } \ No newline at end of file Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java?rev=769099&r1=769098&r2=769099&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java Mon Apr 27 18:40:44 2009 @@ -89,37 +89,26 @@ // Collection targets = route(msg.getDestination(), msg); - msg.store = database; - msg.setStoreTracking(msg.store.allocateStoreTracking()); + //Set up the delivery for persistence: + msg.beginDispatch(database); - // TODO: - // Consider doing some caching of this target list. Most producers - // always send to the same destination. - if (targets != null) { - - if (msg.isResponseRequired()) { - // We need to ack the message once we ensure we won't loose it. - // We know we won't loose it once it's persisted or delivered to - // a consumer Setup a callback to get notifed once one of those - // happens. - if (!msg.isPersistent()) { - // Let the client know the broker got the message. - msg.onMessagePersisted(); + try + { + // TODO: + // Consider doing some caching of this sub list. Most producers + // always send to the same destination. + if (targets != null) { + // The sinks will request persistence via MessageDelivery.persist() + // if they require persistence: + for (DeliveryTarget dt : targets) { + dt.deliver(msg, controller); } } - - // The sinks will request persistence via MessageDelivery.persist() - // if they require persistence: - for (DeliveryTarget dt : targets) { - dt.deliver(msg, controller); - //if (dt.match(msg)) { - // - // dt.getSink().add(msg, controller); - //} - } - + } + finally + { try { - msg.persistIfNeeded(controller); + msg.finishDispatch(controller); } catch (IOException ioe) { //TODO: Error serializing the message, this should trigger an error //This is a pretty severe error as we've already delivered @@ -128,13 +117,6 @@ //should persist the message prior to sending to the recips? ioe.printStackTrace(); } - - } else { - // Let the client know we got the message even though there - // were no valid targets to deliver the message to. - if (msg.isResponseRequired()) { - msg.onMessagePersisted(); - } } } @@ -158,7 +140,6 @@ public void setVirtualHost(VirtualHost virtualHost) { this.virtualHost = virtualHost; - this.database = virtualHost.getDatabase(); } public VirtualHost getVirtualHost() { Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java?rev=769099&r1=769098&r2=769099&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java Mon Apr 27 18:40:44 2009 @@ -20,66 +20,105 @@ import java.util.HashMap; import org.apache.activemq.Service; -import org.apache.activemq.broker.store.BrokerDatabase; import org.apache.activemq.protobuf.AsciiBuffer; +import org.apache.activemq.queue.IQueue; /** * @author chirino */ public class VirtualHost implements Service { - - final private HashMap queues = new HashMap(); + + final private BrokerQueueStore queueStore; + final private MessageBroker broker; + final private HashMap queues = new HashMap(); private ArrayList hostNames = new ArrayList(); private Router router; - private BrokerDatabase database; - - public VirtualHost() { + private boolean started; + + public VirtualHost(MessageBroker broker) { + this.broker = broker; this.router = new Router(); this.router.setVirtualHost(this); + this.queueStore = new BrokerQueueStore(); } - + public AsciiBuffer getHostName() { - if( hostNames.size() > 0 ) { + if (hostNames.size() > 0) { hostNames.get(0); } return null; } - + public ArrayList getHostNames() { return hostNames; } + public void setHostNames(ArrayList hostNames) { this.hostNames = hostNames; } - + public Router getRouter() { return router; } - public void start() throws Exception { + public synchronized void start() throws Exception { + + if (started) { + return; + } + + router.setDatabase(broker.getDatabase()); + + queueStore.setDatabase(broker.getDatabase()); + queueStore.setDispatcher(broker.getDispatcher()); + queueStore.loadQueues(); + // Create Queue instances + for (IQueue iQueue : queueStore.getSharedQueues()) { + Queue queue = new Queue(iQueue); + Domain domain = router.getDomain(Router.QUEUE_DOMAIN); + Destination dest = new Destination.SingleDestination(Router.QUEUE_DOMAIN, iQueue.getDescriptor().getQueueName()); + queue.setDestination(dest); + domain.add(dest.getName(), queue); + queues.put(dest.getName(), queue); + } for (Queue queue : queues.values()) { queue.start(); } + started = true; } - public void stop() throws Exception { + + public synchronized void stop() throws Exception { + if (!started) { + return; + } for (Queue queue : queues.values()) { queue.stop(); } + started = false; } - public void addQueue(Queue queue) { - Domain domain = router.getDomain(queue.getDestination().getDomain()); - domain.add(queue.getDestination().getName(), queue); - } - - public BrokerDatabase getDatabase() { - return database; + public synchronized Queue createQueue(Destination dest) throws Exception { + if(!started) + { + //Queues from the store must be loaded before we can create new ones: + throw new IllegalStateException("Can't create queue on unstarted host"); + } + + Queue queue = queues.get(dest); + // If the queue doesn't exist create it: + if (queue == null) { + IQueue iQueue = queueStore.createSharedQueue(dest.getName().toString()); + queue = new Queue(iQueue); + queue.setDestination(dest); + Domain domain = router.getDomain(Router.QUEUE_DOMAIN); + domain.add(dest.getName(), queue); + queues.put(dest.getName(), queue); + } + queue.start(); + return queue; } - public void setDatabase(BrokerDatabase store) { - this.database = store; - router.setDatabase(database); + public BrokerQueueStore getQueueStore() { + return queueStore; } - - } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=769099&r1=769098&r2=769099&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Mon Apr 27 18:40:44 2009 @@ -36,6 +36,7 @@ private AsciiBuffer producerId; private OpenWireFormat storeWireFormat; private PersistListener persistListener = null; + private final int size; public interface PersistListener { public void onMessagePersisted(OpenWireMessageDelivery delivery); @@ -43,6 +44,7 @@ public OpenWireMessageDelivery(Message message) { this.message = message; + this.size = message.getSize(); } public void setPersistListener(PersistListener listener) { @@ -56,7 +58,8 @@ return destination; } - public int getFlowLimiterSize() { + public int getMemorySize() { + //return size; return 1; } @@ -112,6 +115,7 @@ record.setBuffer(new Buffer(bytes.getData(), bytes.getOffset(), bytes.getLength())); record.setStreamKey((long) 0); record.setMessageId(getMsgId()); + record.setSize(getFlowLimiterSize()); return record; } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=769099&r1=769098&r2=769099&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Mon Apr 27 18:40:44 2009 @@ -20,12 +20,14 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import org.apache.activemq.WindowLimiter; import org.apache.activemq.broker.BrokerConnection; +import org.apache.activemq.broker.BrokerMessageDelivery; import org.apache.activemq.broker.DeliveryTarget; import org.apache.activemq.broker.Destination; import org.apache.activemq.broker.MessageDelivery; @@ -83,6 +85,7 @@ import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.protobuf.AsciiBuffer; import org.apache.activemq.protobuf.Buffer; +import org.apache.activemq.queue.QueueStore; import org.apache.activemq.queue.SingleFlowRelay; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.state.CommandVisitor; @@ -100,6 +103,10 @@ private OpenWireFormat storeWireFormat; private Router router; + public OpenwireProtocolHandler() { + setStoreWireFormat(new OpenWireFormat()); + } + public void start() throws Exception { } @@ -133,7 +140,6 @@ public Response processAddConsumer(ConsumerInfo info) throws Exception { ConsumerContext ctx = new ConsumerContext(info); consumers.put(info.getConsumerId(), ctx); - router.bind(convert(info.getDestination()), ctx); return ack(command); } @@ -403,11 +409,13 @@ private final ConsumerInfo info; private String name; private BooleanExpression selector; - private boolean durable; - private AsciiBuffer durableQueueName; + private boolean isDurable; + private boolean isQueueReceiver; + private QueueStore.QueueDescriptor durableQueueId; private SingleFlowRelay queue; public WindowLimiter limiter; + private AtomicLong deliverySequence = new AtomicLong(0); HashMap pendingMessages = new HashMap(); LinkedList pendingMessageIds = new LinkedList(); @@ -415,11 +423,13 @@ public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException { this.info = info; this.name = info.getConsumerId().toString(); - durable = info.isDurable(); - if (durable) { - durableQueueName = new AsciiBuffer(info.getSubscriptionName()); + + isDurable = info.isDurable(); + if (isDurable) { + durableQueueId = new QueueStore.QueueDescriptor(); + durableQueueId.setQueueName(new AsciiBuffer(info.getSubscriptionName())); try { - connection.getBroker().getDefaultVirtualHost().getDatabase().addQueue(durableQueueName); + connection.getBroker().getDefaultVirtualHost().getQueueStore().addQueue(durableQueueId); } catch (Throwable thrown) { thrown.printStackTrace(); } @@ -442,36 +452,42 @@ md.setMessage(msg); md.setDestination(msg.getDestination()); // Add to the pending list if persistent and we are durable: - if (isDurable() && message.isPersistent()) { + if (message.isPersistent() && (isDurable() || isQueueReceiver())) { synchronized (queue) { Object old = pendingMessages.put(msg.getMessageId(), message); - if(old != null) - { + if (old != null) { new Exception("Duplicate message id: " + msg.getMessageId()).printStackTrace(); } pendingMessageIds.add(msg.getMessageId()); connection.write(md); } - } - else - { + } else { + if (isQueueReceiver()) { + message.acknowledge(durableQueueId); + } connection.write(md); } }; }); + + // Subscribe + if (info.getDestination().isQueue()) { + isQueueReceiver = true; + } + router.bind(convert(info.getDestination()), this); } + public void ack(MessageAck info) { - //TODO: The pending message queue could probably be optimized to avoid having - //to create a new list here. - LinkedList acked = new LinkedList(); + // TODO: The pending message queue could probably be optimized to + // avoid having to create a new list here. + LinkedList acked = new LinkedList(); synchronized (queue) { - if (isDurable()) { + if (isDurable() || isQueueReceiver()) { MessageId id = info.getLastMessageId(); while (!pendingMessageIds.isEmpty()) { MessageId pendingId = pendingMessageIds.getFirst(); MessageDelivery delivery = pendingMessages.remove(pendingId); acked.add(delivery); - delivery.delete(durableQueueName); pendingMessageIds.removeFirst(); if (pendingId.equals(id)) { break; @@ -481,12 +497,11 @@ } limiter.onProtocolCredit(info.getMessageCount()); } - - //Delete outside of synchronization on queue to avoid contention with enqueueing - //threads. - for(MessageDelivery delivery : acked) - { - delivery.delete(durableQueueName); + + // Delete outside of synchronization on queue to avoid contention + // with enqueueing threads. + for (MessageDelivery delivery : acked) { + delivery.acknowledge(durableQueueId); } } @@ -501,9 +516,9 @@ if (isDurable() && delivery.isPersistent()) { try { - delivery.persist(durableQueueName, true); + delivery.persist(durableQueueId, null, deliverySequence.incrementAndGet(), true); } catch (IOException e) { - // TODO Auto-generated catch block + // TODO Auto-generated catch restoreBlock e.printStackTrace(); } } @@ -511,6 +526,10 @@ queue.add(delivery, source); } + public boolean hasSelector() { + return selector != null; + } + public boolean match(MessageDelivery message) { Message msg = message.asType(Message.class); if (msg == null) { @@ -529,7 +548,11 @@ } public boolean isDurable() { - return durable; + return isDurable; + } + + public boolean isQueueReceiver() { + return isQueueReceiver; } public AsciiBuffer getPersistentQueueName() { @@ -550,8 +573,7 @@ AsciiBuffer domain; if (dest.isQueue()) { domain = Router.QUEUE_DOMAIN; - } - if (dest.isTopic()) { + } else if (dest.isTopic()) { domain = Router.TOPIC_DOMAIN; } else { throw new IllegalArgumentException("Unsupported domain type: " + dest); @@ -592,13 +614,18 @@ public void setWireFormat(WireFormat wireFormat) { this.wireFormat = (OpenWireFormat) wireFormat; - this.storeWireFormat = this.wireFormat.copy(); + setStoreWireFormat(this.wireFormat.copy()); + } + + private void setStoreWireFormat(OpenWireFormat wireFormat) { + this.storeWireFormat = wireFormat; + storeWireFormat.setVersion(OpenWireFormat.DEFAULT_VERSION); storeWireFormat.setCacheEnabled(false); storeWireFormat.setTightEncodingEnabled(false); storeWireFormat.setSizePrefixDisabled(false); } - public MessageDelivery createMessageDelivery(MessageRecord record) throws IOException { + public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException { Buffer buf = record.getBuffer(); Message message = (Message) storeWireFormat.unmarshal(new ByteSequence(buf.data, buf.offset, buf.length)); OpenWireMessageDelivery delivery = new OpenWireMessageDelivery(message); Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java?rev=769099&r1=769098&r2=769099&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java Mon Apr 27 18:40:44 2009 @@ -20,7 +20,7 @@ import org.apache.activemq.Service; import org.apache.activemq.broker.BrokerConnection; -import org.apache.activemq.broker.MessageDelivery; +import org.apache.activemq.broker.BrokerMessageDelivery; import org.apache.activemq.broker.store.Store.MessageRecord; import org.apache.activemq.wireformat.WireFormat; @@ -31,6 +31,6 @@ public void onException(Exception error); public void setWireFormat(WireFormat wf); - public MessageDelivery createMessageDelivery(MessageRecord record) throws IOException; + public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException; } \ No newline at end of file Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=769099&r1=769098&r2=769099&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Mon Apr 27 18:40:44 2009 @@ -54,7 +54,7 @@ return destination; } - public int getFlowLimiterSize() { + public int getMemorySize() { return frame.getContent().length; } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=769099&r1=769098&r2=769099&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Mon Apr 27 18:40:44 2009 @@ -25,11 +25,13 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.InvalidSelectorException; import org.apache.activemq.WindowLimiter; import org.apache.activemq.broker.BrokerConnection; +import org.apache.activemq.broker.BrokerMessageDelivery; import org.apache.activemq.broker.DeliveryTarget; import org.apache.activemq.broker.Destination; import org.apache.activemq.broker.MessageDelivery; @@ -49,6 +51,7 @@ import org.apache.activemq.flow.SizeLimiter; import org.apache.activemq.flow.ISinkController.FlowControllable; import org.apache.activemq.protobuf.AsciiBuffer; +import org.apache.activemq.queue.QueueStore; import org.apache.activemq.queue.SingleFlowRelay; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.transport.stomp.Stomp; @@ -99,7 +102,7 @@ } }); actionHandlers.put(Stomp.Commands.SEND, new ActionHander() { - + public void onStompFrame(StompFrame frame) throws Exception { String dest = frame.getHeaders().get(Stomp.Headers.Send.DESTINATION); Destination destination = translator(frame).convertToDestination(StompProtocolHandler.this, dest); @@ -276,7 +279,8 @@ private LinkedHashMap sentMessageIds = new LinkedHashMap(); private boolean durable; - private AsciiBuffer durableQueueName; + private QueueStore.QueueDescriptor durableQueueId; + private AtomicLong deliverySequence = new AtomicLong(0); public ConsumerContext(final StompFrame subscribe) throws Exception { translator = translator(subscribe); @@ -359,6 +363,10 @@ return queue; } + public boolean hasSelector() { + return false; + } + public boolean match(MessageDelivery message) { StompFrame stompMessage = message.asType(StompFrame.class); if (stompMessage == null) { @@ -394,7 +402,7 @@ if (isDurable() && delivery.isPersistent()) { try { - delivery.persist(durableQueueName, true); + delivery.persist(durableQueueId, null, deliverySequence.incrementAndGet(), true); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); @@ -512,7 +520,7 @@ return null; } - public MessageDelivery createMessageDelivery(MessageRecord record) { + public BrokerMessageDelivery createMessageDelivery(MessageRecord record) { throw new UnsupportedOperationException(); } }