Author: chirino
Date: Mon Mar 30 16:20:28 2009
New Revision: 760028
URL: http://svn.apache.org/viewvc?rev=760028&view=rev
Log:
Applying Colin's patch from https://issues.apache.org/activemq/browse/AMQ-2187
Added:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/StoreFactory.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java
activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/
activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/kaha-db
activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/memory
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistentQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java Mon Mar 30 16:20:28 2009
@@ -16,56 +16,86 @@
*/
package org.apache.activemq.broker;
+import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import org.apache.activemq.broker.store.BrokerDatabase;
+import org.apache.activemq.broker.store.BrokerDatabase.OperationContext;
import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.PersistentQueue;
+import org.apache.activemq.protobuf.AsciiBuffer;
public abstract class BrokerMessageDelivery implements MessageDelivery {
- HashSet<PersistentQueue<MessageDelivery>> persistentTargets;
- // Indicates whether or not the message has already been saved
- // if it hasn't in memory updates can be done.
+ HashSet<AsciiBuffer> persistentTargets;
+ // Indicates whether or not the message has been saved to the
+ // database, if not then in memory updates can be done.
boolean saved = false;
long storeTracking = -1;
BrokerDatabase store;
+ boolean fromStore = false;
+ boolean enableFlushDelay = true;
+ OperationContext saveContext;
+ boolean cancelled = false;
+
+ public void setFromStore(boolean val) {
+ fromStore = true;
+ }
public final boolean isFromStore() {
- return false;
+ return fromStore;
}
- public final void persist(PersistentQueue<MessageDelivery> queue) {
+ public final void persist(AsciiBuffer queue, boolean delayable) throws IOException {
synchronized (this) {
if (!saved) {
if (persistentTargets == null) {
- persistentTargets = new HashSet<PersistentQueue<MessageDelivery>>();
+ persistentTargets = new HashSet<AsciiBuffer>();
}
persistentTargets.add(queue);
return;
}
+ if (!delayable) {
+ enableFlushDelay = false;
+ }
}
-
- //TODO probably need to pass in the saving queue's source controller here
- //and treat it like it is dispatching to the saver queue.
+
+ // TODO probably need to pass in the saving queue's source controller
+ // here and treat it like it is dispatching to the saver queue.
store.saveMessage(this, queue, null);
}
- public final void delete(PersistentQueue<MessageDelivery> queue) {
+ public final void delete(AsciiBuffer queue) {
+ boolean firePersistListener = false;
synchronized (this) {
if (!saved) {
persistentTargets.remove(queue);
- return;
+ if (persistentTargets.isEmpty()) {
+ if (saveContext != null) {
+
+ if (!cancelled) {
+ if (saveContext.cancel()) {
+ cancelled = true;
+ firePersistListener = true;
+ }
+
+ saved = true;
+ }
+ }
+ }
+ } else {
+ store.deleteMessage(this, queue);
}
}
- store.deleteMessage(this, queue);
+ if (firePersistListener) {
+ onMessagePersisted();
+ }
+
}
- public synchronized void beginStore(long storeTracking) {
- saved = true;
+ public void setStoreTracking(long storeTracking) {
this.storeTracking = storeTracking;
}
@@ -73,21 +103,47 @@
return storeTracking;
}
- public Collection<PersistentQueue<MessageDelivery>> getPersistentQueues() {
+ public Collection<AsciiBuffer> getPersistentQueues() {
return persistentTargets;
}
- public void persistIfNeeded(ISourceController<?> controller) {
- boolean saveNeeded = false;
+ public void beginStore() {
synchronized (this) {
- if (persistentTargets.isEmpty()) {
+ saved = true;
+ }
+ }
+
+ public void persistIfNeeded(ISourceController<?> controller) throws IOException {
+ boolean firePersistListener = false;
+ synchronized (this) {
+ boolean saveNeeded = true;
+ if (persistentTargets == null || persistentTargets.isEmpty()) {
saveNeeded = false;
saved = true;
}
+
+ // If any of the targets requested save then save the message
+ // Note that this could be the case even if the message isn't
+ // persistent if a target requested that the message be spooled
+ // for some other reason such as queue memory overflow.
+ if (saveNeeded) {
+ saveContext = store.persistReceivedMessage(this, controller);
+ }
+ // If none of the targets required persistence, then fire the
+ // persist listener:
+ else if (isResponseRequired() && isPersistent()) {
+ firePersistListener = true;
+ }
}
- if (saveNeeded) {
- store.persistReceivedMessage(this, controller);
+ if (firePersistListener) {
+ onMessagePersisted();
}
+
+ }
+
+ public boolean isFlushDelayable() {
+ // TODO Auto-generated method stub
+ return enableFlushDelay;
}
}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java?rev=760028&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java Mon Mar 30 16:20:28 2009
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.broker.store.BrokerDatabase;
+import org.apache.activemq.broker.store.BrokerDatabase.MessageRestoreListener;
+import org.apache.activemq.broker.store.BrokerDatabase.RestoredMessage;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.Mapper;
+import org.apache.activemq.queue.Store;
+import org.apache.activemq.queue.Subscription;
+
+public class DBQueueStore<K> implements Store<K, MessageDelivery> {
+
+ private final BrokerDatabase database;
+ private final AsciiBuffer queue;
+ private final MessageRetriever retriever;
+
+ private long firstKey = -1;
+ private long lastKey = -1;
+
+ private int count = 0;
+ private boolean loading = true;
+
+ protected HashMap<K, DBStoreNode> map = new HashMap<K, DBStoreNode>();
+ protected TreeMap<Long, DBStoreNode> order = new TreeMap<Long, DBStoreNode>();
+ private Mapper<K, MessageDelivery> keyExtractor;
+
+ DBQueueStore(BrokerDatabase database, AsciiBuffer queue, IDispatcher dispatcher) {
+ this.database = database;
+ this.queue = queue;
+ retriever = new MessageRetriever(dispatcher);
+ retriever.start();
+ }
+
+ public StoreNode<K, MessageDelivery> add(K key, MessageDelivery delivery) {
+
+ // New to this queue?
+ if (delivery.getStoreTracking() > lastKey) {
+ return addInternal(key, delivery);
+ } else {
+ throw new IllegalArgumentException(this + " Duplicate key: " + delivery);
+ }
+ }
+
+ public void setKeyMapper(Mapper<K, MessageDelivery> keyExtractor) {
+ this.keyExtractor = keyExtractor;
+ }
+
+ private DBStoreNode addInternal(K key, MessageDelivery delivery) {
+ DBStoreNode node = new DBStoreNode(delivery);
+ map.put(keyExtractor.map(delivery), node);
+ order.put(delivery.getStoreTracking(), node);
+ return node;
+ }
+
+ public boolean isEmpty() {
+ return count == 0;
+ }
+
+ public StoreCursor<K, MessageDelivery> openCursor() {
+ return new DBStoreCursor();
+ }
+
+ public StoreCursor<K, MessageDelivery> openCursorAt(StoreNode<K, MessageDelivery> next) {
+ DBStoreCursor cursor = new DBStoreCursor();
+ cursor.next = (DBStoreNode) next;
+ return cursor;
+ }
+
+ public StoreNode<K, MessageDelivery> remove(K key) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public int size() {
+ return count;
+ }
+
+ private class DBStoreCursor implements StoreCursor<K, MessageDelivery> {
+ private long pos;
+ private long last = -1;
+
+ private DBStoreNode node;
+ private DBStoreNode next;
+
+ public StoreNode<K, MessageDelivery> peekNext() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void setNext(StoreNode<K, MessageDelivery> node) {
+ this.next = (DBStoreNode) next;
+
+ }
+
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ SortedMap<Long, DBStoreNode> m = order.tailMap(last + 1);
+ if (m.isEmpty()) {
+ next = null;
+ } else {
+ next = m.get(m.firstKey());
+ }
+ return next != null;
+ }
+
+ public StoreNode<K, MessageDelivery> next() {
+ try {
+ hasNext();
+ return next;
+ } finally {
+ last = next.tracking;
+ next = null;
+ }
+ }
+
+ public boolean isReady() {
+ return !loading;
+ }
+
+ public void remove() {
+ database.deleteMessage(node.delivery, queue);
+ }
+ }
+
+ private class DBStoreNode implements StoreNode<K, MessageDelivery> {
+ private MessageDelivery delivery;
+ private K key;
+ private long ownerId = -1;
+ private final long tracking;
+
+ DBStoreNode(MessageDelivery delivery) {
+ this.delivery = delivery;
+ tracking = delivery.getStoreTracking();
+ key = keyExtractor.map(delivery);
+ retriever.save(this);
+ }
+
+ public boolean acquire(Subscription<MessageDelivery> owner) {
+ long id = owner.getSink().getResourceId();
+ // TODO Auto-generated method stub
+ if (ownerId == -1 || id == ownerId) {
+ ownerId = owner.getSink().getResourceId();
+ return true;
+ }
+ return false;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public MessageDelivery getValue() {
+ return delivery;
+ }
+
+ public void unacquire() {
+ ownerId = -1;
+ }
+ }
+
+ private class MessageRetriever implements Dispatchable, MessageRestoreListener {
+
+ private final DispatchContext dispatchContext;
+ private AtomicBoolean loaded = new AtomicBoolean(false);
+
+ private long loadCursor = 0;
+ private long max = -1;
+ private long loadedCount;
+
+ private final ConcurrentLinkedQueue<RestoredMessage> restoredMsgs = new ConcurrentLinkedQueue<RestoredMessage>();
+
+ MessageRetriever(IDispatcher dispatcher) {
+ dispatchContext = dispatcher.register(this, "MessageRetriever-" + queue);
+ }
+
+ public void save(DBStoreNode node) {
+ try {
+ node.delivery.persist(queue, false);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public void start() {
+ if (!loaded.get()) {
+ database.restoreMessages(queue, loadCursor, 50, this);
+ }
+ }
+
+ public boolean dispatch() {
+ while (true) {
+ RestoredMessage restored = restoredMsgs.poll();
+
+ if (restored == null) {
+ break;
+ }
+
+ try {
+ MessageDelivery delivery = restored.getMessageDelivery();
+ addInternal(keyExtractor.map(delivery), delivery);
+ if (firstKey == -1) {
+ firstKey = delivery.getStoreTracking();
+ }
+ if (lastKey < delivery.getStoreTracking()) {
+ lastKey = delivery.getStoreTracking();
+ }
+ loadedCount++;
+
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ if (!loaded.get()) {
+ database.restoreMessages(queue, loadCursor, 50, this);
+ }
+ return false;
+ }
+
+ public void messagesRestored(Collection<RestoredMessage> msgs) {
+ if (!msgs.isEmpty()) {
+ restoredMsgs.addAll(msgs);
+ } else {
+ loaded.set(true);
+ }
+ dispatchContext.requestDispatch();
+ }
+ }
+}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java Mon Mar 30 16:20:28 2009
@@ -23,6 +23,8 @@
public interface DeliveryTarget {
+ public void deliver(MessageDelivery delivery, ISourceController<?> source);
+
public IFlowSink<MessageDelivery> getSink();
public boolean match(MessageDelivery message);
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java Mon Mar 30 16:20:28 2009
@@ -23,6 +23,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Connection;
+import org.apache.activemq.broker.store.BrokerDatabase;
+import org.apache.activemq.broker.store.Store;
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.transport.DispatchableTransportServer;
@@ -45,6 +47,7 @@
private String connectUri;
private String name;
private IDispatcher dispatcher;
+ private BrokerDatabase database;
private final AtomicBoolean stopping = new AtomicBoolean();
public String getName() {
@@ -62,12 +65,19 @@
for (VirtualHost virtualHost : virtualHosts.values()) {
virtualHost.stop();
}
+ database.stop();
dispatcher.shutdown();
}
public final void start() throws Exception {
dispatcher.start();
+ if (database != null) {
+ database.start();
+ } else {
+ throw new Exception("Store not initialized");
+ }
+ addVirtualHost(getDefaultVirtualHost());
for (VirtualHost virtualHost : virtualHosts.values()) {
virtualHost.start();
@@ -134,12 +144,16 @@
}
// /////////////////////////////////////////////////////////////////
- // Virtual Host Related Opperations
+ // Virtual Host Related Opperations
// /////////////////////////////////////////////////////////////////
public VirtualHost getDefaultVirtualHost() {
synchronized (virtualHosts) {
- if( defaultVirtualHost==null ) {
+ if (defaultVirtualHost == null) {
defaultVirtualHost = new VirtualHost();
+ defaultVirtualHost.setDatabase(database);
+ ArrayList<AsciiBuffer> names = new ArrayList<AsciiBuffer>(1);
+ names.add(new AsciiBuffer("default"));
+ defaultVirtualHost.setHostNames(names);
}
return defaultVirtualHost;
}
@@ -174,6 +188,7 @@
setDefaultVirtualHost(host);
}
}
+ host.setDatabase(database);
}
public synchronized void removeVirtualHost(VirtualHost host) throws Exception {
@@ -181,9 +196,10 @@
for (AsciiBuffer name : host.getHostNames()) {
virtualHosts.remove(name);
}
- // Was the default virtual host removed? Set the default to the next virtual host.
- if( host == defaultVirtualHost ) {
- if( virtualHosts.isEmpty() ) {
+ // Was the default virtual host removed? Set the default to the next
+ // virtual host.
+ if (host == defaultVirtualHost) {
+ if (virtualHosts.isEmpty()) {
defaultVirtualHost = null;
} else {
defaultVirtualHost = virtualHosts.values().iterator().next();
@@ -204,4 +220,8 @@
}
}
+ public void setStore(Store store) {
+ database = new BrokerDatabase(store, dispatcher);
+ }
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Mon Mar 30 16:20:28 2009
@@ -16,10 +16,11 @@
*/
package org.apache.activemq.broker;
+import java.io.IOException;
+
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.queue.PersistentQueue;
public interface MessageDelivery {
@@ -53,13 +54,19 @@
*/
public void onMessagePersisted();
- public Store.MessageRecord createMessageRecord();
+ public Store.MessageRecord createMessageRecord() throws IOException;
public Buffer getTransactionId();
- public void persist(PersistentQueue<MessageDelivery> queue);
+ public void persist(AsciiBuffer queue, boolean delayable) throws IOException;
- public void delete(PersistentQueue<MessageDelivery> queue);
+ public void delete(AsciiBuffer queue);
+
+ /**
+ * Sets the unique storage tracking number.
+ * @param tracking The tracking number.
+ */
+ public void setStoreTracking(long tracking);
/**
* Gets the tracking number used to identify this message in the message
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java Mon Mar 30 16:20:28 2009
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker;
+import java.io.IOException;
import java.util.HashMap;
import org.apache.activemq.broker.DeliveryTarget;
@@ -51,6 +52,10 @@
protected IQueue<AsciiBuffer, MessageDelivery> cratePartition(Integer partitionKey) {
return createSharedFlowQueue();
}
+
+ public boolean isElementPersistent(MessageDelivery elem) {
+ return elem.isPersistent();
+ }
};
queue.setPartitionMapper(partitionMapper);
queue.setResourceName(destination.getName().toString());
@@ -74,6 +79,9 @@
SharedPriorityQueue<AsciiBuffer, MessageDelivery> queue = new SharedPriorityQueue<AsciiBuffer, MessageDelivery>(destination.getName().toString(), limiter);
queue.setKeyMapper(keyExtractor);
queue.setAutoRelease(true);
+ //DBQueueStore<AsciiBuffer> store = new DBQueueStore<AsciiBuffer>(broker.getDefaultVirtualHost().getDatabase(), queue, broker.getDispatcher());
+ //store.setKeyMapper(keyExtractor);
+ //queue.setStore(store);
queue.setDispatcher(broker.getDispatcher());
return queue;
} else {
@@ -81,13 +89,26 @@
SharedQueue<AsciiBuffer, MessageDelivery> queue = new SharedQueue<AsciiBuffer, MessageDelivery>(destination.getName().toString(), limiter);
queue.setKeyMapper(keyExtractor);
queue.setAutoRelease(true);
+ //DBQueueStore<AsciiBuffer> store = new DBQueueStore<AsciiBuffer>(broker.getDefaultVirtualHost().getDatabase(), queue, broker.getDispatcher());
+ //store.setKeyMapper(keyExtractor);
+ //queue.setStore(store);
queue.setDispatcher(broker.getDispatcher());
return queue;
}
}
- public final void deliver(ISourceController<MessageDelivery> source, MessageDelivery msg) {
- queue.add(msg, source);
+ public final void deliver(MessageDelivery delivery, ISourceController<?> source) {
+ try {
+ if(delivery.isPersistent())
+ {
+ delivery.persist(destination.getName(), true);
+ }
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ queue.add(delivery, source);
}
public final Destination getDestination() {
@@ -180,5 +201,4 @@
public boolean isDurable() {
return true;
}
-
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java Mon Mar 30 16:20:28 2009
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker;
+import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -26,12 +27,9 @@
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.broker.QueueDomain;
import org.apache.activemq.broker.TopicDomain;
-import org.apache.activemq.broker.store.Store.Callback;
-import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.broker.store.Store.VoidCallback;
+import org.apache.activemq.broker.store.BrokerDatabase;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.protobuf.Buffer;
final public class Router {
@@ -40,6 +38,7 @@
private final HashMap<AsciiBuffer, Domain> domains = new HashMap<AsciiBuffer, Domain>();
private VirtualHost virtualHost;
+ private BrokerDatabase database;
public Router() {
domains.put(QUEUE_DOMAIN, new QueueDomain());
@@ -90,8 +89,9 @@
//
Collection<DeliveryTarget> targets = route(msg.getDestination(), msg);
- msg.store = getVirtualHost().getDatabase();
-
+ msg.store = database;
+ msg.setStoreTracking(msg.store.allocateStoreTracking());
+
// TODO:
// Consider doing some caching of this target list. Most producers
// always send to the same destination.
@@ -108,15 +108,26 @@
}
}
- //The sinks will request persistence via MessageDelivery.persist()
- //if they require persistence:
+ // The sinks will request persistence via MessageDelivery.persist()
+ // if they require persistence:
for (DeliveryTarget dt : targets) {
- if (dt.match(msg)) {
- dt.getSink().add(msg, controller);
- }
+ dt.deliver(msg, controller);
+ //if (dt.match(msg)) {
+ //
+ // dt.getSink().add(msg, controller);
+ //}
}
- msg.persistIfNeeded(controller);
+ try {
+ msg.persistIfNeeded(controller);
+ } catch (IOException ioe) {
+ //TODO: Error serializing the message, this should trigger an error
+ //This is a pretty severe error as we've already delivered
+ //the message to the recipients. If we send an error response
+ //back it could result in a duplicate. Does this mean that we
+ //should persist the message prior to sending to the recips?
+ ioe.printStackTrace();
+ }
} else {
// Let the client know we got the message even though there
@@ -147,10 +158,15 @@
public void setVirtualHost(VirtualHost virtualHost) {
this.virtualHost = virtualHost;
+ this.database = virtualHost.getDatabase();
}
public VirtualHost getVirtualHost() {
return virtualHost;
}
+ public void setDatabase(BrokerDatabase database) {
+ this.database = database;
+ }
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java Mon Mar 30 16:20:28 2009
@@ -31,10 +31,11 @@
final private HashMap<Destination, Queue> queues = new HashMap<Destination, Queue>();
private ArrayList<AsciiBuffer> hostNames = new ArrayList<AsciiBuffer>();
private Router router;
- private BrokerDatabase database = new BrokerDatabase();
+ private BrokerDatabase database;
public VirtualHost() {
- setRouter(new Router());
+ this.router = new Router();
+ this.router.setVirtualHost(this);
}
public AsciiBuffer getHostName() {
@@ -54,10 +55,6 @@
public Router getRouter() {
return router;
}
- public void setRouter(Router router) {
- this.router = router;
- this.router.setVirtualHost(this);
- }
public void start() throws Exception {
for (Queue queue : queues.values()) {
@@ -81,6 +78,7 @@
public void setDatabase(BrokerDatabase store) {
this.database = store;
+ router.setDatabase(database);
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Mon Mar 30 16:20:28 2009
@@ -16,12 +16,16 @@
*/
package org.apache.activemq.broker.openwire;
+import java.io.IOException;
+
import org.apache.activemq.broker.BrokerMessageDelivery;
import org.apache.activemq.broker.Destination;
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.command.Message;
+import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.util.ByteSequence;
public class OpenWireMessageDelivery extends BrokerMessageDelivery {
@@ -30,6 +34,7 @@
private final Message message;
private Destination destination;
private AsciiBuffer producerId;
+ private OpenWireFormat storeWireFormat;
private PersistListener persistListener = null;
public interface PersistListener {
@@ -60,7 +65,7 @@
}
public AsciiBuffer getMsgId() {
- return null;
+ return new AsciiBuffer(message.getMessageId().toString());
}
public AsciiBuffer getProducerId() {
@@ -100,13 +105,12 @@
return message.isResponseRequired();
}
-
- public MessageRecord createMessageRecord() {
+ public MessageRecord createMessageRecord() throws IOException {
MessageRecord record = new MessageRecord();
record.setEncoding(ENCODING);
- // TODO: Serialize it..
- // record.setBuffer()
- // record.setStreamKey(stream);
+ ByteSequence bytes = storeWireFormat.marshal(message);
+ record.setBuffer(new Buffer(bytes.getData(), bytes.getOffset(), bytes.getLength()));
+ record.setStreamKey((long) 0);
record.setMessageId(getMsgId());
return record;
}
@@ -116,4 +120,7 @@
return null;
}
+ public void setStoreWireFormat(OpenWireFormat wireFormat) {
+ this.storeWireFormat = wireFormat;
+ }
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Mon Mar 30 16:20:28 2009
@@ -16,8 +16,10 @@
*/
package org.apache.activemq.broker.openwire;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
@@ -51,6 +53,7 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
@@ -66,21 +69,25 @@
import org.apache.activemq.filter.LogicExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NoLocalExpression;
+import org.apache.activemq.flow.AbstractLimitedFlowResource;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.FlowController;
import org.apache.activemq.flow.IFlowController;
import org.apache.activemq.flow.IFlowDrain;
+import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.IFlowResource;
import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.IFlowSource;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.flow.SizeLimiter;
import org.apache.activemq.flow.ISinkController.FlowControllable;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.queue.SingleFlowRelay;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.transport.WireFormatNegotiator;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
public class OpenwireProtocolHandler implements ProtocolHandler, PersistListener {
@@ -88,30 +95,13 @@
protected final HashMap<ProducerId, ProducerContext> producers = new HashMap<ProducerId, ProducerContext>();
protected final HashMap<ConsumerId, ConsumerContext> consumers = new HashMap<ConsumerId, ConsumerContext>();
- protected final Object inboundMutex = new Object();
- protected IFlowController<OpenWireMessageDelivery> inboundController;
-
protected BrokerConnection connection;
private OpenWireFormat wireFormat;
+ private OpenWireFormat storeWireFormat;
private Router router;
public void start() throws Exception {
- // Setup the inbound processing..
- final Flow flow = new Flow("broker-" + connection.getName() + "-inbound", false);
- SizeLimiter<OpenWireMessageDelivery> limiter = new SizeLimiter<OpenWireMessageDelivery>(connection.getInputWindowSize(), connection.getInputResumeThreshold());
- inboundController = new FlowController<OpenWireMessageDelivery>(new FlowControllableAdapter() {
- public void flowElemAccepted(ISourceController<OpenWireMessageDelivery> controller, OpenWireMessageDelivery elem) {
- if (elem.isResponseRequired()) {
- elem.setPersistListener(OpenwireProtocolHandler.this);
- }
- router.route(elem, controller);
- controller.elementDispatched(elem);
- }
- public String toString() {
- return flow.getFlowName();
- }
- }, flow, limiter, inboundMutex);
}
public void stop() throws Exception {
@@ -172,6 +162,7 @@
ProducerContext producerContext = producers.get(producerId);
OpenWireMessageDelivery md = new OpenWireMessageDelivery(info);
+ md.setStoreWireFormat(storeWireFormat);
// Only producers that are not using a window will block,
// and if it blocks.
@@ -346,11 +337,10 @@
}.start();
}
}
-
public void onMessagePersisted(OpenWireMessageDelivery delivery) {
// TODO This method should not block:
- // Either add to output queue, or spin off in a separate thread.
+ // Either add to output queue, or spin off in a separate thread.
ack(delivery.getMessage());
}
@@ -367,52 +357,44 @@
// Internal Support Methods
// /////////////////////////////////////////////////////////////////
- static class FlowControllableAdapter implements FlowControllable<OpenWireMessageDelivery> {
- public void flowElemAccepted(ISourceController<OpenWireMessageDelivery> controller, OpenWireMessageDelivery elem) {
- }
-
- public IFlowSink<OpenWireMessageDelivery> getFlowSink() {
- return null;
- }
-
- public IFlowSource<OpenWireMessageDelivery> getFlowSource() {
- return null;
- }
- }
-
- class ProducerContext {
+ class ProducerContext extends AbstractLimitedFlowResource<OpenWireMessageDelivery> {
+ protected final Object inboundMutex = new Object();
private IFlowController<OpenWireMessageDelivery> controller;
private String name;
public ProducerContext(final ProducerInfo info) {
- this.name = info.getProducerId().toString();
+ super(info.getProducerId().toString());
+ final Flow flow = new Flow("broker-" + name + "-inbound", false);
// Openwire only uses credit windows at the producer level for
// producers that request the feature.
+ IFlowLimiter<OpenWireMessageDelivery> limiter;
if (info.getWindowSize() > 0) {
- final Flow flow = new Flow("broker-" + name + "-inbound", false);
- WindowLimiter<OpenWireMessageDelivery> limiter = new WindowLimiter<OpenWireMessageDelivery>(false, flow, info.getWindowSize(), info.getWindowSize() / 2) {
+ limiter = new WindowLimiter<OpenWireMessageDelivery>(false, flow, info.getWindowSize(), info.getWindowSize() / 2) {
@Override
protected void sendCredit(int credit) {
ProducerAck ack = new ProducerAck(info.getProducerId(), credit);
connection.write(ack);
}
};
-
- controller = new FlowController<OpenWireMessageDelivery>(new FlowControllableAdapter() {
- public void flowElemAccepted(ISourceController<OpenWireMessageDelivery> controller, OpenWireMessageDelivery msg) {
- router.route(msg, controller);
- controller.elementDispatched(msg);
- }
-
- public String toString() {
- return flow.getFlowName();
- }
- }, flow, limiter, inboundMutex);
} else {
- controller = inboundController;
+
+ limiter = new SizeLimiter<OpenWireMessageDelivery>(connection.getInputWindowSize(), connection.getInputResumeThreshold());
}
+
+ controller = new FlowController<OpenWireMessageDelivery>(new FlowControllable<OpenWireMessageDelivery>() {
+ public void flowElemAccepted(ISourceController<OpenWireMessageDelivery> controller, OpenWireMessageDelivery msg) {
+ router.route(msg, controller);
+ controller.elementDispatched(msg);
+ }
+
+ public IFlowResource getFlowResource() {
+ return ProducerContext.this;
+ }
+ }, flow, limiter, inboundMutex);
+
+ super.onFlowOpened(controller);
}
}
@@ -422,13 +404,27 @@
private String name;
private BooleanExpression selector;
private boolean durable;
+ private AsciiBuffer durableQueueName;
private SingleFlowRelay<MessageDelivery> queue;
public WindowLimiter<MessageDelivery> limiter;
+ HashMap<MessageId, MessageDelivery> pendingMessages = new HashMap<MessageId, MessageDelivery>();
+ LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
+
public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException {
this.info = info;
this.name = info.getConsumerId().toString();
+ durable = info.isDurable();
+ if (durable) {
+ durableQueueName = new AsciiBuffer(info.getSubscriptionName());
+ try {
+ connection.getBroker().getDefaultVirtualHost().getDatabase().addQueue(durableQueueName);
+ } catch (Throwable thrown) {
+ thrown.printStackTrace();
+ }
+ }
+
selector = parseSelector(info);
Flow flow = new Flow("broker-" + name + "-outbound", false);
@@ -445,6 +441,13 @@
md.setConsumerId(info.getConsumerId());
md.setMessage(msg);
md.setDestination(msg.getDestination());
+ // Add to the pending list if persistent and we are durable:
+ if (isDurable() && message.isPersistent()) {
+ synchronized (queue) {
+ pendingMessages.put(msg.getMessageId(), message);
+ pendingMessageIds.add(msg.getMessageId());
+ }
+ }
connection.write(md);
};
});
@@ -452,6 +455,19 @@
public void ack(MessageAck info) {
synchronized (queue) {
+ if (isDurable()) {
+ MessageId id = info.getLastMessageId();
+ while (!pendingMessageIds.isEmpty()) {
+ MessageId pendingId = pendingMessageIds.peekFirst();
+ MessageDelivery delivery = pendingMessages.remove(pendingId);
+ delivery.delete(durableQueueName);
+ pendingMessageIds.removeFirst();
+ if (pendingId.equals(id)) {
+ break;
+ }
+ }
+
+ }
limiter.onProtocolCredit(info.getMessageCount());
}
}
@@ -460,6 +476,23 @@
return queue;
}
+ public final void deliver(MessageDelivery delivery, ISourceController<?> source) {
+ if (!match(delivery)) {
+ return;
+ }
+
+ if (isDurable() && delivery.isPersistent()) {
+ try {
+ delivery.persist(durableQueueName, true);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ queue.add(delivery, source);
+ }
+
public boolean match(MessageDelivery message) {
Message msg = message.asType(Message.class);
if (msg == null) {
@@ -541,9 +574,17 @@
public void setWireFormat(WireFormat wireFormat) {
this.wireFormat = (OpenWireFormat) wireFormat;
+ this.storeWireFormat = this.wireFormat.copy();
+ storeWireFormat.setCacheEnabled(false);
+ storeWireFormat.setTightEncodingEnabled(false);
+ storeWireFormat.setSizePrefixDisabled(false);
}
- public MessageDelivery createMessageDelivery(MessageRecord record) {
- throw new UnsupportedOperationException();
+ public MessageDelivery createMessageDelivery(MessageRecord record) throws IOException {
+ Buffer buf = record.getBuffer();
+ Message message = (Message) storeWireFormat.unmarshal(new ByteSequence(buf.data, buf.offset, buf.length));
+ OpenWireMessageDelivery delivery = new OpenWireMessageDelivery(message);
+ delivery.setStoreWireFormat(storeWireFormat);
+ return delivery;
}
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java Mon Mar 30 16:20:28 2009
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.broker.protocol;
+import java.io.IOException;
+
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerConnection;
import org.apache.activemq.broker.MessageDelivery;
@@ -29,5 +31,6 @@
public void onException(Exception error);
public void setWireFormat(WireFormat wf);
- public MessageDelivery createMessageDelivery(MessageRecord record);
+ public MessageDelivery createMessageDelivery(MessageRecord record) throws IOException;
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Mon Mar 30 16:20:28 2009
@@ -38,12 +38,13 @@
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.flow.AbstractLimitedFlowResource;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.FlowController;
import org.apache.activemq.flow.IFlowController;
import org.apache.activemq.flow.IFlowDrain;
+import org.apache.activemq.flow.IFlowResource;
import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.IFlowSource;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.flow.SizeLimiter;
import org.apache.activemq.flow.ISinkController.FlowControllable;
@@ -63,12 +64,11 @@
public void onStompFrame(StompFrame frame) throws Exception;
}
+ private InboundContext inboundContext;
+
protected final HashMap<String, ActionHander> actionHandlers = new HashMap<String, ActionHander>();
protected final HashMap<String, ConsumerContext> consumers = new HashMap<String, ConsumerContext>();
- protected final Object inboundMutex = new Object();
- protected IFlowController<StompMessageDelivery> inboundController;
-
protected BrokerConnection connection;
// TODO: need to update the FrameTranslator to normalize to new broker API
@@ -99,15 +99,14 @@
}
});
actionHandlers.put(Stomp.Commands.SEND, new ActionHander() {
+
public void onStompFrame(StompFrame frame) throws Exception {
String dest = frame.getHeaders().get(Stomp.Headers.Send.DESTINATION);
Destination destination = translator(frame).convertToDestination(StompProtocolHandler.this, dest);
frame.setAction(Stomp.Responses.MESSAGE);
StompMessageDelivery md = new StompMessageDelivery(frame, destination);
- while (!inboundController.offer(md, null)) {
- inboundController.waitForFlowUnblock();
- }
+ inboundContext.onReceive(md);
}
});
actionHandlers.put(Stomp.Commands.SUBSCRIBE, new ActionHander() {
@@ -147,22 +146,7 @@
}
public void start() throws Exception {
- // Setup the inbound processing..
- final Flow inboundFlow = new Flow("broker-" + connection.getName() + "-inbound", false);
- SizeLimiter<StompMessageDelivery> inLimiter = new SizeLimiter<StompMessageDelivery>(connection.getInputWindowSize(), connection.getInputResumeThreshold());
- inboundController = new FlowController<StompMessageDelivery>(new FlowControllableAdapter() {
- public void flowElemAccepted(ISourceController<StompMessageDelivery> controller, StompMessageDelivery elem) {
- if (elem.isResponseRequired()) {
- elem.setPersistListener(StompProtocolHandler.this);
- }
- router.route(elem, controller);
- controller.elementDispatched(elem);
- }
-
- public String toString() {
- return inboundFlow.getFlowName();
- }
- }, inboundFlow, inLimiter, inboundMutex);
+ inboundContext = new InboundContext();
Flow outboundFlow = new Flow("broker-" + connection.getName() + "-outbound", false);
SizeLimiter<MessageDelivery> outLimiter = new SizeLimiter<MessageDelivery>(connection.getOutputWindowSize(), connection.getOutputWindowSize());
@@ -240,16 +224,40 @@
// /////////////////////////////////////////////////////////////////
// Internal Support Methods
// /////////////////////////////////////////////////////////////////
- static class FlowControllableAdapter implements FlowControllable<StompMessageDelivery> {
- public void flowElemAccepted(ISourceController<StompMessageDelivery> controller, StompMessageDelivery elem) {
- }
- public IFlowSink<StompMessageDelivery> getFlowSink() {
- return null;
+ class InboundContext extends AbstractLimitedFlowResource<StompMessageDelivery> {
+ protected final Object inboundMutex = new Object();
+ protected IFlowController<StompMessageDelivery> inboundController;
+
+ InboundContext() {
+ super("broker-" + connection.getName() + "-inbound");
+ // Setup the inbound processing..
+ final Flow inboundFlow = new Flow(getResourceName(), false);
+ SizeLimiter<StompMessageDelivery> inLimiter = new SizeLimiter<StompMessageDelivery>(connection.getInputWindowSize(), connection.getInputResumeThreshold());
+ inboundController = new FlowController<StompMessageDelivery>(new FlowControllable<StompMessageDelivery>() {
+ public void flowElemAccepted(ISourceController<StompMessageDelivery> controller, StompMessageDelivery elem) {
+ if (elem.isResponseRequired()) {
+ elem.setPersistListener(StompProtocolHandler.this);
+ }
+ router.route(elem, controller);
+ controller.elementDispatched(elem);
+ }
+
+ public String toString() {
+ return inboundFlow.getFlowName();
+ }
+
+ public IFlowResource getFlowResource() {
+ return InboundContext.this;
+ }
+ }, inboundFlow, inLimiter, inboundMutex);
+ super.onFlowOpened(inboundController);
}
- public IFlowSource<StompMessageDelivery> getFlowSource() {
- return null;
+ public void onReceive(StompMessageDelivery md) throws InterruptedException {
+ while (!inboundController.offer(md, null)) {
+ inboundController.waitForFlowUnblock();
+ }
}
}
@@ -268,6 +276,7 @@
private LinkedHashMap<AsciiBuffer, AsciiBuffer> sentMessageIds = new LinkedHashMap<AsciiBuffer, AsciiBuffer>();
private boolean durable;
+ private AsciiBuffer durableQueueName;
public ConsumerContext(final StompFrame subscribe) throws Exception {
translator = translator(subscribe);
@@ -378,6 +387,24 @@
// }
}
+ public void deliver(MessageDelivery delivery, ISourceController<?> source) {
+ if (!match(delivery)) {
+ return;
+ }
+
+ if (isDurable() && delivery.isPersistent()) {
+ try {
+ delivery.persist(durableQueueName, true);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ queue.add(delivery, source);
+
+ }
+
public boolean isDurable() {
return durable;
}
@@ -407,9 +434,10 @@
connection.write(errorMessage);
}
- //Callback from MessageDelivery when message's persistence guarantees are met.
+ // Callback from MessageDelivery when message's persistence guarantees are
+ // met.
public void onMessagePersisted(StompMessageDelivery delivery) {
- //TODO this method must not block:
+ // TODO this method must not block:
ack(delivery.getStomeFame());
}
@@ -483,7 +511,7 @@
// TODO Auto-generated method stub
return null;
}
-
+
public MessageDelivery createMessageDelivery(MessageRecord record) {
throw new UnsupportedOperationException();
}
|