activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r773616 [1/2] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/stomp/ main/java/org/apache/activemq/broker/store/ main/java/o...
Date Mon, 11 May 2009 16:25:10 GMT
Author: chirino
Date: Mon May 11 16:25:10 2009
New Revision: 773616

URL: http://svn.apache.org/viewvc?rev=773616&view=rev
Log:
Applying colins patch https://issues.apache.org/activemq/browse/AMQ-2244 
Thanks!


Added:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistencePolicy.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java Mon May 11 16:25:10 2009
@@ -27,6 +27,7 @@
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.queue.QueueStore;
 import org.apache.activemq.queue.QueueStore.QueueDescriptor;
+import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
 
 public abstract class BrokerMessageDelivery implements MessageDelivery {
 
@@ -39,7 +40,7 @@
 
     // List of persistent targets for which the message should be saved
     // when dispatch is complete:
-    HashMap<QueueStore.QueueDescriptor, Long> persistentTargets;
+    HashMap<QueueStore.QueueDescriptor, SaveableQueueElement<MessageDelivery>> persistentTargets;
 
     long storeTracking = -1;
     BrokerDatabase store;
@@ -73,7 +74,7 @@
         return fromStore;
     }
 
-    public final void persist(QueueStore.QueueDescriptor queue, ISourceController<?> controller, long queueSequence, boolean delayable) throws IOException {
+    public final void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable){
         synchronized (this) {
             // Can flush of this message to the store be delayed?
             if (enableFlushDelay && !delayable) {
@@ -84,15 +85,15 @@
             // finished:
             if (dispatching) {
                 if (persistentTargets == null) {
-                    persistentTargets = new HashMap<QueueStore.QueueDescriptor, Long>();
+                    persistentTargets = new HashMap<QueueStore.QueueDescriptor, SaveableQueueElement<MessageDelivery>>();
                 }
-                persistentTargets.put(queue, queueSequence);
+                persistentTargets.put(elem.getQueueDescriptor(), elem);
                 return;
             }
             // Otherwise, if it is still in the saver queue, we can add this
             // queue to the queue list:
             else if (pendingSave != null) {
-                persistentTargets.put(queue, queueSequence);
+                persistentTargets.put(elem.getQueueDescriptor(), elem);
                 if (!delayable) {
                     pendingSave.requestFlush();
                 }
@@ -100,7 +101,7 @@
             }
         }
 
-        store.saveMessage(this, queue, queueSequence, controller);
+        store.saveMessage(elem, controller, delayable);
     }
 
     public final void acknowledge(QueueStore.QueueDescriptor queue) {
@@ -152,7 +153,7 @@
         return storeTracking;
     }
 
-    public Set<Entry<QueueDescriptor, Long>> getPersistentQueues() {
+    public Set<Entry<QueueDescriptor, SaveableQueueElement<MessageDelivery>>> getPersistentQueues() {
         return persistentTargets.entrySet();
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java Mon May 11 16:25:10 2009
@@ -31,6 +31,7 @@
 import org.apache.activemq.queue.IPartitionedQueue;
 import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.queue.PartitionedQueue;
+import org.apache.activemq.queue.PersistencePolicy;
 import org.apache.activemq.queue.QueueStore;
 import org.apache.activemq.queue.SharedPriorityQueue;
 import org.apache.activemq.queue.SharedQueue;
@@ -50,12 +51,33 @@
 
     private final short PARTITION_TYPE = 0;
     private final short SHARED_QUEUE_TYPE = 1;
-    //private final short SUBSCRIBER_QUEUE_TYPE = 2;
+    // private final short SUBSCRIBER_QUEUE_TYPE = 2;
 
     private final HashMap<String, IQueue<Long, MessageDelivery>> sharedQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
-    //private final HashMap<String, IFlowQueue<MessageDelivery>> subscriberQueues = new HashMap<String, IFlowQueue<MessageDelivery>>();
+    // private final HashMap<String, IFlowQueue<MessageDelivery>>
+    // subscriberQueues = new HashMap<String, IFlowQueue<MessageDelivery>>();
 
     private Mapper<Integer, MessageDelivery> partitionMapper;
+    
+    private static final Mapper<Long, MessageDelivery> EXPIRATION_MAPPER = new Mapper<Long, MessageDelivery>() {
+        public Long map(MessageDelivery element) {
+            return element.getExpiration();
+        }
+    };
+
+    private static final PersistencePolicy<MessageDelivery> SHARED_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
+        public boolean isPersistent(MessageDelivery elem) {
+            return elem.isPersistent();
+        }
+
+        public boolean isPageOutPlaceHolders() {
+            return false;
+        }
+
+        public boolean isPagingEnabled() {
+            return false;
+        }
+    };
 
     public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
         public Integer map(MessageDelivery element) {
@@ -202,6 +224,8 @@
         ret.getDescriptor().setApplicationType(PARTITION_TYPE);
         ret.setDispatcher(dispatcher);
         ret.setStore(this);
+        ret.setPersistencePolicy(SHARED_QUEUE_PERSISTENCE_POLICY);
+        ret.setExpirationMapper(EXPIRATION_MAPPER);
 
         return ret;
     }
@@ -210,16 +234,12 @@
         elem.acknowledge(descriptor);
     }
 
-    public final boolean isElemPersistent(MessageDelivery elem) {
-        return elem.isPersistent();
-    }
-
     public final boolean isFromStore(MessageDelivery elem) {
         return elem.isFromStore();
     }
 
-    public final void persistQueueElement(QueueStore.QueueDescriptor descriptor, ISourceController<?> controller, MessageDelivery elem, long sequence, boolean delayable) throws Exception {
-        elem.persist(descriptor, controller, sequence, delayable);
+    public final void persistQueueElement(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
+        elem.getElement().persist(elem, controller, delayable);
     }
 
     public final void restoreQueueElements(QueueStore.QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount,

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Mon May 11 16:25:10 2009
@@ -16,18 +16,20 @@
  */
 package org.apache.activemq.broker;
 
-import java.io.IOException;
-
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
 
 public interface MessageDelivery {
 
     public Destination getDestination();
 
+    /**
+     * @return the message priority.
+     */
     public int getPriority();
 
     public int getFlowLimiterSize();
@@ -38,15 +40,24 @@
 
     public <T> T asType(Class<T> type);
 
+    /**
+     * @return if the delivery is persistent
+     */
     public boolean isPersistent();
 
     /**
+     * @return a positive value indicates that the delivery has an expiration
+     *         time.
+     */
+    public long getExpiration();
+
+    /**
      * @return True if this message was read from the store.
      */
     public boolean isFromStore();
 
     /**
-     * Returns true if this message requires acknowledgement.
+     * Returns true if this message requires acknowledgment.
      */
     public boolean isResponseRequired();
 
@@ -56,32 +67,46 @@
      */
     public void onMessagePersisted();
 
-    public Store.MessageRecord createMessageRecord() throws IOException;
+    /**
+     * @return A message record for the element that can be persisted to the
+     *         message store.
+     */
+    public Store.MessageRecord createMessageRecord();
 
+    /**
+     * @return if the message is part of a transaction this returns the
+     *         transaction id.
+     */
     public Buffer getTransactionId();
 
     /**
-     * Asynchronously persists a message in the store.
+     * Called by a queue to request that the element be persisted. The save is
+     * done asynchronously, and depending on the state of the message delivery
+     * may not even be issued to the underlying persistence store until a later
+     * date. As such callers should use the acknowledge method to delete this
+     * message rather than directly issuing a delete through the message store
+     * itself. Direct delete from the message store is only safe once the
+     * message has been saved to the store, so callers should request
+     * notification of the save via the
+     * {@link SaveableQueueElement#requestSaveNotify()} method before attempting
+     * to acces the store directly.
      * 
-     * @param queue
-     *            The queue against which to save the message.
+     * @param elem
+     *            The element to save
      * @param controller
-     *            The source of the message.
-     * @param sequenceNumber
-     *            The sequence number of the message in the queue
+     *            A flow controller to use in the event that there isn't room in
+     *            the database.
      * @param delayable
-     *            Can be set to indicate that flush of the message can be
-     *            delayed in the hopes that an acknowledgement will negate the
-     *            need for a delete
-     * @throws IOException If there is an exception serializing the message. 
+     *            Whether or not the save operation can be delayed.
      */
-    public void persist(QueueStore.QueueDescriptor queue, ISourceController<?> controller, long sequenceNumber, boolean delayable) throws IOException;
+    public void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable);
 
     /**
-     * Acknowledges the message for a particular queue. This will cause it to be 
-     * deleted from the message store. 
+     * Acknowledges the message for a particular queue. This will cause it to be
+     * deleted from the message store.
      * 
-     * @param queue The queue for which to acknowledge the message.
+     * @param queue
+     *            The queue for which to acknowledge the message.
      */
     public void acknowledge(QueueStore.QueueDescriptor queue);
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java Mon May 11 16:25:10 2009
@@ -23,71 +23,180 @@
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
 
+/**
+ * @author cmacnaug
+ *
+ */
 public class MessageDeliveryWrapper implements MessageDelivery {
 
     private final MessageDelivery delegate;
-    
+
+    /**
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#persist(org.apache.activemq.queue.QueueStore.SaveableQueueElement,
+     *      org.apache.activemq.flow.ISourceController, boolean)
+     */
     public void acknowledge(QueueStore.QueueDescriptor queue) {
         delegate.acknowledge(queue);
     }
 
+    /**
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#persist(org.apache.activemq.queue.QueueStore.SaveableQueueElement,
+     *      org.apache.activemq.flow.ISourceController, boolean)
+     */
     public <T> T asType(Class<T> type) {
         return delegate.asType(type);
     }
 
-    public MessageRecord createMessageRecord() throws IOException {
+    /**
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#persist(org.apache.activemq.queue.QueueStore.SaveableQueueElement,
+     *      org.apache.activemq.flow.ISourceController, boolean)
+     */
+    public MessageRecord createMessageRecord() {
         return delegate.createMessageRecord();
     }
 
+    /**
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#persist(org.apache.activemq.queue.QueueStore.SaveableQueueElement,
+     *      org.apache.activemq.flow.ISourceController, boolean)
+     */
     public Destination getDestination() {
         return delegate.getDestination();
     }
 
+    /**
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#persist(org.apache.activemq.queue.QueueStore.SaveableQueueElement,
+     *      org.apache.activemq.flow.ISourceController, boolean)
+     */
     public int getFlowLimiterSize() {
         return delegate.getFlowLimiterSize();
     }
 
+    /**
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#persist(org.apache.activemq.queue.QueueStore.SaveableQueueElement,
+     *      org.apache.activemq.flow.ISourceController, boolean)
+     */
     public AsciiBuffer getMsgId() {
         return delegate.getMsgId();
     }
 
+    /**
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#persist(org.apache.activemq.queue.QueueStore.SaveableQueueElement,
+     *      org.apache.activemq.flow.ISourceController, boolean)
+     */
     public int getPriority() {
         return delegate.getPriority();
     }
 
+    /**
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#persist(org.apache.activemq.queue.QueueStore.SaveableQueueElement,
+     *      org.apache.activemq.flow.ISourceController, boolean)
+     */
     public AsciiBuffer getProducerId() {
         return delegate.getProducerId();
     }
 
+    /**
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#persist(org.apache.activemq.queue.QueueStore.SaveableQueueElement,
+     *      org.apache.activemq.flow.ISourceController, boolean)
+     */
     public long getStoreTracking() {
         return delegate.getStoreTracking();
     }
 
+    /**
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#persist(org.apache.activemq.queue.QueueStore.SaveableQueueElement,
+     *      org.apache.activemq.flow.ISourceController, boolean)
+     */
     public Buffer getTransactionId() {
         return delegate.getTransactionId();
     }
 
+    /**
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#persist(org.apache.activemq.queue.QueueStore.SaveableQueueElement,
+     *      org.apache.activemq.flow.ISourceController, boolean)
+     */
     public boolean isFromStore() {
         return delegate.isFromStore();
     }
 
+    /**
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#persist(org.apache.activemq.queue.QueueStore.SaveableQueueElement,
+     *      org.apache.activemq.flow.ISourceController, boolean)
+     */
     public boolean isPersistent() {
         return delegate.isPersistent();
     }
+    
+    
+    /** (non-Javadoc)
+     * @see org.apache.activemq.broker.MessageDelivery#getExpiration()
+     */
+    public long getExpiration() {
+        return delegate.getExpiration();
+    }
 
+    /**
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#persist(org.apache.activemq.queue.QueueStore.SaveableQueueElement,
+     *      org.apache.activemq.flow.ISourceController, boolean)
+     */
     public boolean isResponseRequired() {
         return delegate.isResponseRequired();
     }
 
+    /**
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#persist(org.apache.activemq.queue.QueueStore.SaveableQueueElement,
+     *      org.apache.activemq.flow.ISourceController, boolean)
+     */
     public void onMessagePersisted() {
         delegate.onMessagePersisted();
     }
 
-    public void persist(QueueStore.QueueDescriptor queue, ISourceController<?> controller, long sequenceNumber, boolean delayable) throws IOException {
-        delegate.persist(queue, controller, sequenceNumber, delayable);
-    }
-
+    /**
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#persist(org.apache.activemq.queue.QueueStore.SaveableQueueElement,
+     *      org.apache.activemq.flow.ISourceController, boolean)
+     */
+    public void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
+        delegate.persist(elem, controller, delayable);
+    }
+
+    /**
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#persist(org.apache.activemq.queue.QueueStore.SaveableQueueElement,
+     *      org.apache.activemq.flow.ISourceController, boolean)
+     */
     MessageDeliveryWrapper(MessageDelivery delivery) {
         delegate = delivery;
     }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java Mon May 11 16:25:10 2009
@@ -27,6 +27,7 @@
 import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.queue.QueueStore;
 import org.apache.activemq.queue.Subscription;
+import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
 import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
 
 public class Queue implements DeliveryTarget {
@@ -36,11 +37,10 @@
     private IQueue<Long, MessageDelivery> queue;
     private VirtualHost virtualHost;
 
-    Queue(IQueue<Long, MessageDelivery> queue)
-    {
+    Queue(IQueue<Long, MessageDelivery> queue) {
         this.queue = queue;
     }
-    
+
     public final void deliver(MessageDelivery delivery, ISourceController<?> source) {
         queue.add(delivery, source);
     }
@@ -101,9 +101,8 @@
     public final Destination getDestination() {
         return destination;
     }
-    
-    public boolean isDurable()
-    {
+
+    public boolean isDurable() {
         return true;
     }
 
@@ -114,10 +113,6 @@
             this.target = dt;
         }
 
-        public boolean isPreAcquired() {
-            return true;
-        }
-
         public boolean matches(MessageDelivery message) {
             return target.match(message);
         }
@@ -157,7 +152,7 @@
         }
 
         @Override
-        public void persist(QueueStore.QueueDescriptor queue, ISourceController<?> controller, long sequenceNumber, boolean delayable) throws IOException {
+        public void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
             // We override this for queue deliveries as the sub needn't
             // persist the message
         }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Mon May 11 16:25:10 2009
@@ -108,10 +108,16 @@
         return message.isResponseRequired();
     }
 
-    public MessageRecord createMessageRecord() throws IOException {
+    public MessageRecord createMessageRecord() {
         MessageRecord record = new MessageRecord();
         record.setEncoding(ENCODING);
-        ByteSequence bytes = storeWireFormat.marshal(message);
+        
+        ByteSequence bytes;
+        try {
+            bytes = storeWireFormat.marshal(message);
+        } catch (IOException e) {
+            return null;
+        }
         record.setBuffer(new Buffer(bytes.getData(), bytes.getOffset(), bytes.getLength()));
         record.setStreamKey((long) 0);
         record.setMessageId(getMsgId());
@@ -127,4 +133,11 @@
     public void setStoreWireFormat(OpenWireFormat wireFormat) {
         this.storeWireFormat = wireFormat;
     }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.broker.MessageDelivery#getTTE()
+     */
+    public long getExpiration() {
+        return message.getExpiration();
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Mon May 11 16:25:10 2009
@@ -87,6 +87,8 @@
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.queue.QueueStore;
 import org.apache.activemq.queue.SingleFlowRelay;
+import org.apache.activemq.queue.QueueStore.QueueDescriptor;
+import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.transport.WireFormatNegotiator;
@@ -509,15 +511,43 @@
             return queue;
         }
 
-        public final void deliver(MessageDelivery delivery, ISourceController<?> source) {
+        public final void deliver(final MessageDelivery delivery, ISourceController<?> source) {
             if (!match(delivery)) {
                 return;
             }
 
             if (isDurable() && delivery.isPersistent()) {
                 try {
-                    delivery.persist(durableQueueId, null, deliverySequence.incrementAndGet(), true);
-                } catch (IOException e) {
+
+                    final long sequence = deliverySequence.incrementAndGet();
+                    //TODO saveable queue element here is temporary: We should replace this 
+                    //with an actual queue implementation:
+                    delivery.persist(new SaveableQueueElement<MessageDelivery>() {
+
+                        public MessageDelivery getElement() {
+                            // TODO Auto-generated method stub
+                            return delivery;
+                        }
+
+                        public QueueDescriptor getQueueDescriptor() {
+                            // TODO Auto-generated method stub
+                            return durableQueueId;
+                        }
+
+                        public long getSequenceNumber() {
+                            return sequence;
+                        }
+
+                        public void notifySave() {
+                            //noop
+                        }
+                        public boolean requestSaveNotify() {
+                            // TODO Auto-generated method stub
+                            return false;
+                        }
+
+                    }, null, true);
+                } catch (Exception e) {
                     // TODO Auto-generated catch restoreBlock
                     e.printStackTrace();
                 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Mon May 11 16:25:10 2009
@@ -35,6 +35,7 @@
     private int priority = Integer.MIN_VALUE;
     private AsciiBuffer msgId;
     private PersistListener persistListener = null;
+    private long tte = Long.MIN_VALUE;
 
     public interface PersistListener {
         public void onMessagePersisted(StompMessageDelivery delivery);
@@ -69,6 +70,24 @@
         }
         return priority;
     }
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.MessageDelivery#getTTE()
+     */
+    public long getExpiration() {
+        if (tte == Long.MIN_VALUE) {
+            String t = frame.getHeaders().get(Stomp.Headers.Message.EXPIRATION_TIME);
+            try {
+                tte = (t == null) ? -1 : Long.parseLong(t);
+            } catch (NumberFormatException e) {
+                tte = 1;
+            }
+        }
+        return tte;
+    }
+    
 
     public AsciiBuffer getMsgId() {
         if (msgId == null) {
@@ -137,4 +156,6 @@
         // TODO Auto-generated method stub
         return null;
     }
+
+    
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Mon May 11 16:25:10 2009
@@ -53,6 +53,8 @@
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.queue.QueueStore;
 import org.apache.activemq.queue.SingleFlowRelay;
+import org.apache.activemq.queue.QueueStore.QueueDescriptor;
+import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.transport.stomp.Stomp;
 import org.apache.activemq.transport.stomp.StompFrame;
@@ -395,15 +397,42 @@
             // }
         }
 
-        public void deliver(MessageDelivery delivery, ISourceController<?> source) {
+        public void deliver(final MessageDelivery delivery, ISourceController<?> source) {
             if (!match(delivery)) {
                 return;
             }
 
             if (isDurable() && delivery.isPersistent()) {
                 try {
-                    delivery.persist(durableQueueId, null, deliverySequence.incrementAndGet(), true);
-                } catch (IOException e) {
+                    final long sequence = deliverySequence.incrementAndGet();
+                    //TODO saveable queue element here is temporary: We should replace this 
+                    //with an actual queue implementation:
+                    delivery.persist(new SaveableQueueElement<MessageDelivery>() {
+
+                        public MessageDelivery getElement() {
+                            // TODO Auto-generated method stub
+                            return delivery;
+                        }
+
+                        public QueueDescriptor getQueueDescriptor() {
+                            // TODO Auto-generated method stub
+                            return durableQueueId;
+                        }
+
+                        public long getSequenceNumber() {
+                            return sequence;
+                        }
+
+                        public void notifySave() {
+                            //noop
+                        }
+                        public boolean requestSaveNotify() {
+                            // TODO Auto-generated method stub
+                            return false;
+                        }
+
+                    }, null, true);
+                } catch (Exception e) {
                     // TODO Auto-generated catch block
                     e.printStackTrace();
                 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Mon May 11 16:25:10 2009
@@ -52,6 +52,7 @@
 import org.apache.activemq.queue.QueueStore.QueueDescriptor;
 import org.apache.activemq.queue.QueueStore.RestoreListener;
 import org.apache.activemq.queue.QueueStore.RestoredElement;
+import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
 import org.apache.kahadb.util.LinkedNode;
 import org.apache.kahadb.util.LinkedNodeList;
 
@@ -81,7 +82,7 @@
     // num scheduled for delay
     private long delayedFlushPointer = 0; // The last delayable sequence num
     // requested.
-    private final long FLUSH_DELAY_MS = 50;
+    private final long FLUSH_DELAY_MS = 5;
     private final Runnable flushDelayCallback;
 
     public interface DatabaseListener {
@@ -356,8 +357,11 @@
 
                     // If we procecessed some ops, flush and post process:
                     if (!processedQueue.isEmpty()) {
-                        // Sync the store:
+                        
+                        //System.out.println("Flushing queue after processing: " + processedQueue.size() + " - " + processedQueue);
+                        //Sync the store:
                         store.flush();
+                        
 
                         // Post process operations
                         long release = 0;
@@ -442,17 +446,15 @@
      *             If there is an error marshalling the message.
      * @return The {@link OperationContext} associated with the operation
      */
-    public OperationContext persistReceivedMessage(BrokerMessageDelivery delivery, ISourceController<?> source) throws IOException {
-        return add(new AddMessageOperation(delivery), source, delivery.isFlushDelayable());
+    public OperationContext persistReceivedMessage(BrokerMessageDelivery delivery, ISourceController<?> source) {
+        return add(new AddMessageOperation(delivery), source, true);
     }
 
     /**
      * Saves a Message for a single queue.
      * 
-     * @param delivery
-     *            The delivery
-     * @param queue
-     *            The queue
+     * @param queueElement
+     *            The element to save.
      * @param source
      *            The source initiating the save or null, if there isn't one.
      * @throws IOException
@@ -460,8 +462,8 @@
      * 
      * @return The {@link OperationContext} associated with the operation
      */
-    public OperationContext saveMessage(MessageDelivery delivery, QueueStore.QueueDescriptor queue, long queueSequence, ISourceController<?> source) throws IOException {
-        return add(new AddMessageOperation(delivery, queue, queueSequence), source, false);
+    public OperationContext saveMessage(SaveableQueueElement<MessageDelivery> queueElement, ISourceController<?> source, boolean delayable) {
+        return add(new AddMessageOperation(queueElement), source, delayable);
     }
 
     /**
@@ -807,7 +809,7 @@
         }
 
         public String toString() {
-            return "MessageDelete: " + queue.toString() + " tracking: " + storeTracking;
+            return "MessageDelete: " + queue.getQueueName().toString() + " tracking: " + storeTracking + " " + super.toString();
         }
     }
 
@@ -903,38 +905,34 @@
         }
 
         public String toString() {
-            return "MessageRestore: " + queue.toString() + " first: " + firstKey + " max: " + maxRecords;
+            return "MessageRestore: " + queue.getQueueName().toString() + " first: " + firstKey + " max: " + maxRecords;
         }
     }
 
     private class AddMessageOperation extends OperationBase {
 
         private final BrokerMessageDelivery brokerDelivery;
-
+        private final SaveableQueueElement<MessageDelivery> singleElement;
         private final MessageDelivery delivery;
-        private final long queueSequence;
-        private final QueueStore.QueueDescriptor target;
         private MessageRecord record;
-
+        private LinkedList<SaveableQueueElement<MessageDelivery>> notifyTargets;
         private final boolean delayable;
 
-        public AddMessageOperation(BrokerMessageDelivery delivery) throws IOException {
+        public AddMessageOperation(BrokerMessageDelivery delivery) {
             this.brokerDelivery = delivery;
+            this.singleElement = null;
             this.delivery = delivery;
-            this.queueSequence = -1;
-            target = null;
             this.delayable = delivery.isFlushDelayable();
             if (!delayable) {
                 this.record = delivery.createMessageRecord();
             }
         }
 
-        public AddMessageOperation(MessageDelivery delivery, QueueStore.QueueDescriptor target, long queueSequence) throws IOException {
+        public AddMessageOperation(SaveableQueueElement<MessageDelivery> queueElement) {
             this.brokerDelivery = null;
-            this.delivery = delivery;
-            this.target = target;
-            this.queueSequence = queueSequence;
-            this.record = delivery.createMessageRecord();
+            singleElement = queueElement;
+            delivery = queueElement.getElement();
+            this.record = singleElement.getElement().createMessageRecord();
             delayable = false;
         }
 
@@ -950,33 +948,39 @@
         @Override
         protected void doExcecute(Session session) {
 
-            if (target == null) {
+            if (singleElement == null) {
                 brokerDelivery.beginStore();
-                Set<Entry<QueueDescriptor, Long>> targets = brokerDelivery.getPersistentQueues();
+                Set<Entry<QueueDescriptor, SaveableQueueElement<MessageDelivery>>> targets = brokerDelivery.getPersistentQueues();
 
                 if (!targets.isEmpty()) {
                     if (record == null) {
-                        try {
-                            record = delivery.createMessageRecord();
-                        } catch (IOException e) {
-                            throw new FatalStoreException("Error marshalling message", e);
+                        record = brokerDelivery.createMessageRecord();
+                        if (record == null) {
+                            throw new RuntimeException("Error creating message record for " + brokerDelivery.getMsgId());
                         }
                     }
-                    record.setKey(delivery.getStoreTracking());
+                    record.setKey(brokerDelivery.getStoreTracking());
                     session.messageAdd(record);
 
-                    for (Entry<QueueDescriptor, Long> target : brokerDelivery.getPersistentQueues()) {
+                    for (Entry<QueueDescriptor, SaveableQueueElement<MessageDelivery>> target : targets) {
                         try {
                             QueueRecord queueRecord = new QueueRecord();
                             queueRecord.setAttachment(null);
                             queueRecord.setMessageKey(record.getKey());
                             queueRecord.setSize(brokerDelivery.getFlowLimiterSize());
-                            queueRecord.setQueueKey(target.getValue());
+                            queueRecord.setQueueKey(target.getValue().getSequenceNumber());
                             session.queueAddMessage(target.getKey(), queueRecord);
 
                         } catch (KeyNotFoundException e) {
                             e.printStackTrace();
                         }
+
+                        if (target.getValue().requestSaveNotify()) {
+                            if (notifyTargets == null) {
+                                notifyTargets = new LinkedList<SaveableQueueElement<MessageDelivery>>();
+                            }
+                            notifyTargets.add(target.getValue());
+                        }
                     }
                 } else {
                     // Save with no targets must have been cancelled:
@@ -991,8 +995,8 @@
                     queueRecord.setAttachment(null);
                     queueRecord.setMessageKey(record.getKey());
                     queueRecord.setSize(brokerDelivery.getFlowLimiterSize());
-                    queueRecord.setQueueKey(queueSequence);
-                    session.queueAddMessage(target, queueRecord);
+                    queueRecord.setQueueKey(singleElement.getSequenceNumber());
+                    session.queueAddMessage(singleElement.getQueueDescriptor(), queueRecord);
                 } catch (KeyNotFoundException e) {
                     e.printStackTrace();
                 }
@@ -1001,7 +1005,18 @@
 
         @Override
         public void onCommit() {
+
+            // Notify that the message was persisted.
             delivery.onMessagePersisted();
+
+            // Notify any of the targets that requested notify on save:
+            if (singleElement != null && singleElement.requestSaveNotify()) {
+                singleElement.notifySave();
+            } else if (notifyTargets != null) {
+                for (SaveableQueueElement<MessageDelivery> notify : notifyTargets) {
+                    notify.notifySave();
+                }
+            }
         }
 
         public String toString() {
@@ -1068,6 +1083,16 @@
             return qRecord.getSize();
         }
 
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.QueueStore.RestoredElement#getExpiration()
+         */
+        public long getExpiration() {
+            return qRecord.getTte();
+        }
+
     }
 
     public long allocateStoreTracking() {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java Mon May 11 16:25:10 2009
@@ -29,6 +29,7 @@
 import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.PriorityLinkedList;
+import org.apache.activemq.util.TimerHeap;
 import org.apache.kahadb.util.LinkedNode;
 import org.apache.kahadb.util.LinkedNodeList;
 
@@ -40,7 +41,7 @@
     private boolean threaded = false;
     protected final int MAX_USER_PRIORITY;
     protected final HashSet<PriorityDispatchContext> contexts = new HashSet<PriorityDispatchContext>();
-    
+
     // Set if this dispatcher is part of a dispatch pool:
     protected final PooledDispatcher<D> pooledDispatcher;
 
@@ -53,7 +54,12 @@
     private int foreignToggle = 0;
 
     // Timed Execution List
-    protected final TimerHeap timerHeap = new TimerHeap();
+    protected final TimerHeap<Runnable> timerHeap = new TimerHeap<Runnable>() {
+        @Override
+        protected final void execute(Runnable ready) {
+            ready.run();
+        }
+    };
 
     protected final String name;
     private final AtomicBoolean foreignAvailable = new AtomicBoolean(false);
@@ -403,11 +409,11 @@
      */
     public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit) {
         if (getCurrentDispatcher() == this) {
-            timerHeap.add(runnable, delay, timeUnit);
+            timerHeap.addRelative(runnable, delay, timeUnit);
         } else {
             new ForeignEvent() {
                 public void execute() {
-                    timerHeap.add(runnable, delay, timeUnit);
+                    timerHeap.addRelative(runnable, delay, timeUnit);
                 }
             }.addToList();
         }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/TimerHeap.java Mon May 11 16:25:10 2009
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.dispatch;
-
-import java.util.LinkedList;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-
-public class TimerHeap {
-    private final TreeMap<Long, LinkedList<Runnable>> timers = new TreeMap<Long, LinkedList<Runnable>>();
-    private final TimeUnit resolution = TimeUnit.NANOSECONDS;
-    
-    public final void add(Runnable runnable, long delay, TimeUnit timeUnit) {
-
-        long nanoDelay = resolution.convert(delay, timeUnit);
-        long eTime = System.nanoTime() + nanoDelay;
-        LinkedList<Runnable> list = new LinkedList<Runnable>();
-        list.add(runnable);
-
-        LinkedList<Runnable> old = timers.put(eTime, list);
-        if (old != null) {
-            list.addAll(old);
-        }
-    }
-
-    /**
-     * Returns the time of the next scheduled event.
-     * 
-     * @return -1 if there are no events, otherwise the time that the next timer
-     *         should fire.
-     */
-    public final long timeToNext(TimeUnit unit) {
-        if (timers.isEmpty()) {
-            return -1;
-        } else {
-            return unit.convert(Math.max(0, timers.firstKey() - System.nanoTime()), resolution);
-        }
-    }
-
-    /**
-     * Executes ready timers.
-     */
-    public final void executeReadyTimers() {
-        LinkedList<Runnable> ready = null;
-        if (timers.isEmpty()) {
-            return;
-        } else {
-            long now = System.nanoTime();
-            long first = timers.firstKey();
-            if (first > now) {
-                return;
-            }
-            ready = new LinkedList<Runnable>();
-
-            while (first <= now) {
-                ready.addAll(timers.remove(first));
-                if (timers.isEmpty()) {
-                    break;
-                }
-                first = timers.firstKey();
-
-            }
-        }
-
-        for (Runnable runnable : ready) {
-            try {
-                runnable.run();
-            } catch (Throwable thrown) {
-                thrown.printStackTrace();
-            }
-        }
-    }
-}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java Mon May 11 16:25:10 2009
@@ -18,11 +18,14 @@
 
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.util.Mapper;
 
-public interface IQueue<K, V> extends IFlowSink<V>{
+public interface IQueue<K, V> extends IFlowSink<V> {
 
     /**
-     * Called to initialize the queue with values from the queue store.
+     * Called to initialize the queue with values from the queue store. It is
+     * illegal to start or add elements to an uninitialized queue, and doing so
+     * will result in an {@link IllegalStateException}
      * 
      * @param sequenceMin
      *            The lowest sequence number in the store.
@@ -34,25 +37,88 @@
      *            The size of the messages in the queue
      */
     public void initialize(long sequenceMin, long sequenceMax, int count, long size);
-    
+
+    /**
+     * Gets a descriptor for the queue. The descriptor is used to store the
+     * queue in a {@link QueueStore}.
+     * 
+     * @return The queue descriptor.
+     */
     public QueueStore.QueueDescriptor getDescriptor();
-    
+
+    /**
+     * @return the number of elements currently held by the queue.
+     */
     public int getEnqueuedCount();
-    
+
+    /**
+     * @return the size of the elements currently held in the queue.
+     */
     public long getEnqueuedSize();
-    
+
+    /**
+     * Adds a subscription to the queue. When the queue is started and elements
+     * are available, they will be given to the subscription.
+     * 
+     * @param sub
+     *            The subscription to add to the queue.
+     */
     public void addSubscription(Subscription<V> sub);
 
+    /**
+     * Removes a subscription from the queue.
+     * 
+     * @param sub
+     *            The subscription to remove.
+     */
     public boolean removeSubscription(Subscription<V> sub);
-    
+
+    /**
+     * Sets a store against which the queue can persist it's elements.
+     * 
+     * @param store
+     *            The store.
+     */
     public void setStore(QueueStore<K, V> store);
+
+    /**
+     * Sets a persistence policy for the queue which indicates how the queue
+     * should persist its elements.
+     * 
+     * @param persistencePolicy
+     *            The persistence policy for the queue.
+     */
+    public void setPersistencePolicy(PersistencePolicy<V> persistencePolicy);
+
+    /**
+     * Sets a mapper returning the expiration time for elements in this 
+     * queue. A positive value indicates that the message has an expiration
+     * time. 
+     * 
+     * @param expirationMapper The expiration mapper.
+     */
+    public void setExpirationMapper(Mapper<Long, V> expirationMapper);
     
+    /**
+     * Sets the dispatcher for the queue.
+     * 
+     * @param dispatcher
+     *            The dispatcher to be used by the queue.
+     */
     public void setDispatcher(IDispatcher dispatcher);
-    
+
+    /**
+     * Starts the queue.
+     */
     public void start();
-    
+
+    /**
+     * Stops the queue. Elements can still be added to the queue, but they will
+     * not be dispatched to subscriptions until the queue is again restarted.
+     * 
+     * @param dispatcher
+     *            The dispatcher to be used by the queue.
+     */
     public void stop();
-    
-    
- 
+
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Mon May 11 16:25:10 2009
@@ -96,6 +96,14 @@
         this.store = store;
     }
 
+    public void setPersistencePolicy(PersistencePolicy<V> persistencePolicy) {
+        // No-Op for now.
+    }
+
+    public void setExpirationMapper(Mapper<Long, V> expirationMapper) {
+        // No-Op for now.
+    }
+
     abstract public IQueue<K, V> createPartition(int partitionKey);
 
     public void addPartition(int partitionKey, IQueue<K, V> queue) {

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistencePolicy.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistencePolicy.java?rev=773616&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistencePolicy.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistencePolicy.java Mon May 11 16:25:10 2009
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+public interface PersistencePolicy<E> {
+
+    /**
+     * Indicates whether the given element should be persisted on enqueue.
+     * 
+     * @param elem
+     *            The enqueued element.
+     * @return True if the element must be persisted on enqueue.
+     */
+    public boolean isPersistent(E elem);
+    
+    /**
+     * Indicated whether or not paging is enabled for this queue. 
+     * When not enabled elements are kept in memory until they are removed. When
+     * enabled 
+     * @return Whether or not paging is enabled for the queue. 
+     */
+    public boolean isPagingEnabled();
+        
+    /**
+     * When paging is enabled this specifies whether the queue can keep 
+     * a place holder for paged out elements in memory. Keeping place holders
+     * in memory improves performance, but for very large queues keeping
+     * place holders can cause a significant overhead. 
+     * 
+     * This method must return false if {@link #isPagingEnabled()} is false.
+     * 
+     * @return True if the queue should page out place holders.
+     */
+    public boolean isPageOutPlaceHolders();
+
+    public static final class NON_PERSISTENT_POLICY<E> implements PersistencePolicy<E>{
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.queue.PersistencePolicy#isPersistOnEnqueue(java.lang.Object)
+         */
+        public boolean isPersistent(E elem) {
+            return false;
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.queue.PersistencePolicy#isKeepElementRefences()
+         */
+        public boolean isPageOutPlaceHolders() {
+            return false;
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.queue.PersistencePolicy#isPagingEnabled()
+         */
+        public boolean isPagingEnabled() {
+            return false;
+        }
+        
+    }
+
+}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java Mon May 11 16:25:10 2009
@@ -25,17 +25,22 @@
 public interface QueueStore<K, V> {
 
     public interface SaveableQueueElement<V> {
+
         /**
-         * Gets the element to save.
-         * 
-         * @return
+         * @return the descriptor of the queue for which the element should be
+         *         saved.
+         */
+        public QueueDescriptor getQueueDescriptor();
+
+        /**
+         * @return the element to save.
          */
         public V getElement();
 
         /**
-         * Gets the sequence number of the element in the queue
+         * @return the sequence number of the element in the queue
+         * 
          * 
-         * @return
          */
         public long getSequenceNumber();
 
@@ -43,14 +48,13 @@
          * @return a return value of true will cause {@link #notifySave()} to
          *         called when this element is persisted
          */
-        public boolean requestNotify();
+        public boolean requestSaveNotify();
 
         /**
          * Called when the element has been saved.
-         * 
-         * @return
          */
-        public boolean notifySave();
+        public void notifySave();
+
     }
 
     /**
@@ -70,6 +74,12 @@
         int getElementSize();
 
         /**
+         * @return A positive values indicating the expiration time if this
+         *         element is expirable.
+         */
+        long getExpiration();
+
+        /**
          * Returns the sequence number of this element in the queue
          * 
          * @return the sequence number of this element
@@ -261,32 +271,17 @@
     /**
      * Asynchronously saves the given element to the store
      * 
-     * @param descriptor
-     *            The descriptor for the queue.
+     * @param elem
+     *            The element to save
      * @param controller
      *            A flow controller to use in the event that there isn't room in
      *            the database.
-     * @param elem
-     *            The element to save
-     * @param sequence
-     *            The sequence number for the saved element
      * @param delayable
      *            Whether or not the save operation can be delayed.
      * @throws Exception
      *             If there is an error saving the element.
      */
-    public void persistQueueElement(QueueDescriptor descriptor, ISourceController<?> controller, V elem, long sequence, boolean delayable) throws Exception;
-
-    /**
-     * Tests whether or not the given element is persistent. When a message is
-     * added to a persistent queue it should be saved via
-     * {@link #persistQueueElement(QueueDescriptor, Object, long, boolean)}
-     * 
-     * @param elem
-     *            The element to check.
-     * @return True if the element requires persistence.
-     */
-    public boolean isElemPersistent(V elem);
+    public void persistQueueElement(SaveableQueueElement<V> elem, ISourceController<?> controller, boolean delayable);
 
     /**
      * Tests whether or not the given element came from the store. If so, a
@@ -299,16 +294,18 @@
     public boolean isFromStore(V elem);
 
     /**
-     * Adds a queue to the store.
+     * Asynchronously adds a queue to the store.
      * 
      * @param queue
+     *            The descriptor for the queue being added.
      */
     public void addQueue(QueueDescriptor queue);
 
     /**
-     * Deletes a queue from the store.
+     * Asynchronously deletes a queue and all of it's records from the store.
      * 
      * @param queue
+     *            The descriptor for the queue to be deleted.
      */
     public void deleteQueue(QueueDescriptor queue);
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=773616&r1=773615&r2=773616&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java Mon May 11 16:25:10 2009
@@ -37,8 +37,10 @@
     private IDispatcher dispatcher;
     private final PrioritySizeLimiter<V> limiter;
     private QueueStore<K, V> store;
+    private PersistencePolicy<V> persistencePolicy;
     private boolean started;
     private QueueStore.QueueDescriptor queueDescriptor;
+    private Mapper<Long, V> expirationMapper;
 
     public SharedPriorityQueue(String name, PrioritySizeLimiter<V> limiter) {
         super(name);
@@ -77,6 +79,18 @@
         if (count > 0 || size > 0) {
             throw new IllegalArgumentException("Partioned queues do not themselves hold values");
         }
+        if (expirationMapper == null) {
+            expirationMapper = new Mapper<Long, V>() {
+
+                public Long map(V element) {
+                    return -1L;
+                }
+
+            };
+        }
+        if (persistencePolicy == null) {
+            persistencePolicy = new PersistencePolicy.NON_PERSISTENT_POLICY<V>();
+        }
     }
 
     public synchronized int getEnqueuedCount() {
@@ -97,6 +111,14 @@
         this.store = store;
     }
 
+    public void setPersistencePolicy(PersistencePolicy<V> persistencePolicy) {
+        this.persistencePolicy = persistencePolicy;
+    }
+
+    public void setExpirationMapper(Mapper<Long, V> expirationMapper) {
+        this.expirationMapper = expirationMapper;
+    }
+
     public void setResourceName(String resourceName) {
         super.setResourceName(resourceName);
     }
@@ -140,6 +162,8 @@
                 queue.setDispatchPriority(prio);
                 queue.setKeyMapper(keyMapper);
                 queue.setStore(store);
+                queue.setPersistencePolicy(persistencePolicy);
+                queue.setExpirationMapper(expirationMapper);
                 queue.getDescriptor().setParent(queueDescriptor.getQueueName());
                 queue.getDescriptor().setPartitionId(prio);
                 partitions.set(prio, queue);



Mime
View raw message