activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r780141 - in /activemq/sandbox/activemq-flow/activemq-broker/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/store/ main/java/org/apache/activemq/broker/store/kahadb/ main/java/org/apache/activemq/broker/stor...
Date Fri, 29 May 2009 23:45:25 GMT
Author: chirino
Date: Fri May 29 23:45:24 2009
New Revision: 780141

URL: http://svn.apache.org/viewvc?rev=780141&view=rev
Log:
Factored out interfaces that make more sense in the store package.

Added:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/RestoreListener.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/RestoredElement.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/SaveableQueueElement.java
Modified:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/CursoredQueue.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/QueueStore.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/SharedQueue.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
    activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
    activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
    activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/flow/MockQueue.java

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java Fri May 29 23:45:24 2009
@@ -22,12 +22,11 @@
 import java.util.HashMap;
 
 import org.apache.activemq.broker.store.BrokerDatabase;
+import org.apache.activemq.broker.store.QueueDescriptor;
+import org.apache.activemq.broker.store.SaveableQueueElement;
 import org.apache.activemq.broker.store.BrokerDatabase.OperationContext;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.QueueStore;
-import org.apache.activemq.queue.QueueStore.QueueDescriptor;
-import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
 
 public abstract class BrokerMessageDelivery implements MessageDelivery {
 
@@ -40,7 +39,7 @@
 
     // List of persistent targets for which the message should be saved
     // when dispatch is complete:
-    HashMap<QueueStore.QueueDescriptor, SaveableQueueElement<MessageDelivery>> persistentTargets;
+    HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>> persistentTargets;
     SaveableQueueElement<MessageDelivery> singleTarget;
 
     long storeTracking = -1;
@@ -102,7 +101,7 @@
         store.saveMessage(elem, controller, delayable);
     }
 
-    public final void acknowledge(QueueStore.QueueDescriptor queue) {
+    public final void acknowledge(QueueDescriptor queue) {
         boolean firePersistListener = false;
         boolean deleted = false;
         synchronized (this) {
@@ -194,7 +193,7 @@
         }
 
         if (elem.getQueueDescriptor() != singleTarget.getQueueDescriptor()) {
-            persistentTargets = new HashMap<QueueStore.QueueDescriptor, SaveableQueueElement<MessageDelivery>>();
+            persistentTargets = new HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>>();
             persistentTargets.put(elem.getQueueDescriptor(), elem);
             persistentTargets.put(singleTarget.getQueueDescriptor(), singleTarget);
             singleTarget = null;

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java Fri May 29 23:45:24 2009
@@ -23,6 +23,9 @@
 import java.util.Iterator;
 
 import org.apache.activemq.broker.store.BrokerDatabase;
+import org.apache.activemq.broker.store.QueueDescriptor;
+import org.apache.activemq.broker.store.RestoreListener;
+import org.apache.activemq.broker.store.SaveableQueueElement;
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.flow.ISourceController;
@@ -385,7 +388,7 @@
         return ret;
     }
 
-    public final void deleteQueueElement(QueueStore.QueueDescriptor descriptor, MessageDelivery elem) {
+    public final void deleteQueueElement(QueueDescriptor descriptor, MessageDelivery elem) {
         elem.acknowledge(descriptor);
     }
 
@@ -397,16 +400,16 @@
         elem.getElement().persist(elem, controller, delayable);
     }
 
-    public final void restoreQueueElements(QueueStore.QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount,
-            org.apache.activemq.queue.QueueStore.RestoreListener<MessageDelivery> listener) {
+    public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount,
+            org.apache.activemq.broker.store.RestoreListener<MessageDelivery> listener) {
         database.restoreMessages(queue, recordsOnly, firstSequence, maxSequence, maxCount, listener);
     }
 
-    public final void addQueue(QueueStore.QueueDescriptor queue) {
+    public final void addQueue(QueueDescriptor queue) {
         database.addQueue(queue);
     }
 
-    public final void deleteQueue(QueueStore.QueueDescriptor queue) {
+    public final void deleteQueue(QueueDescriptor queue) {
         database.deleteQueue(queue);
     }
 }

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDelivery.java Fri May 29 23:45:24 2009
@@ -16,12 +16,12 @@
  */
 package org.apache.activemq.broker;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
+import org.apache.activemq.broker.store.SaveableQueueElement;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.queue.QueueStore.QueueDescriptor;
-import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
 
 public interface MessageDelivery {
 

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java Fri May 29 23:45:24 2009
@@ -16,12 +16,12 @@
  */
 package org.apache.activemq.broker;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
+import org.apache.activemq.broker.store.SaveableQueueElement;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.queue.QueueStore;
-import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
 
 /**
  * @author cmacnaug
@@ -37,7 +37,7 @@
      * @see org.apache.activemq.broker.MessageDelivery#persist(org.apache.activemq.queue.QueueStore.SaveableQueueElement,
      *      org.apache.activemq.flow.ISourceController, boolean)
      */
-    public void acknowledge(QueueStore.QueueDescriptor queue) {
+    public void acknowledge(QueueDescriptor queue) {
         delegate.acknowledge(queue);
     }
 

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Fri May 29 23:45:24 2009
@@ -46,10 +46,6 @@
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
-import org.apache.activemq.queue.QueueStore;
-import org.apache.activemq.queue.QueueStore.RestoreListener;
-import org.apache.activemq.queue.QueueStore.RestoredElement;
-import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
 import org.apache.kahadb.util.LinkedNode;
 import org.apache.kahadb.util.LinkedNodeList;
 
@@ -424,7 +420,7 @@
      * @param queue
      *            The queue to add.
      */
-    public void addQueue(QueueStore.QueueDescriptor queue) {
+    public void addQueue(QueueDescriptor queue) {
         add(new QueueAddOperation(queue), null, false);
     }
 
@@ -434,7 +430,7 @@
      * @param queue
      *            The queue to delete.
      */
-    public void deleteQueue(QueueStore.QueueDescriptor queue) {
+    public void deleteQueue(QueueDescriptor queue) {
         add(new QueueDeleteOperation(queue), null, false);
     }
 
@@ -479,7 +475,7 @@
      *            The queue.
      * @return The {@link OperationContext} associated with the operation
      */
-    public OperationContext deleteMessage(MessageDelivery delivery, QueueStore.QueueDescriptor queue) {
+    public OperationContext deleteMessage(MessageDelivery delivery, QueueDescriptor queue) {
         return add(new DeleteMessageOperation(delivery.getStoreTracking(), queue), null, false);
     }
 
@@ -505,7 +501,7 @@
      *            The listener to which messags should be passed.
      * @return The {@link OperationContext} associated with the operation
      */
-    public OperationContext restoreMessages(QueueStore.QueueDescriptor queue, boolean recordsOnly, long first, long maxSequence, int maxCount, RestoreListener<MessageDelivery> listener) {
+    public OperationContext restoreMessages(QueueDescriptor queue, boolean recordsOnly, long first, long maxSequence, int maxCount, RestoreListener<MessageDelivery> listener) {
         return add(new RestoreMessageOperation(queue, recordsOnly, first, maxCount, maxSequence, listener), null, true);
     }
 
@@ -721,9 +717,9 @@
 
     private class QueueAddOperation extends OperationBase {
 
-        private QueueStore.QueueDescriptor qd;
+        private QueueDescriptor qd;
 
-        QueueAddOperation(QueueStore.QueueDescriptor queue) {
+        QueueAddOperation(QueueDescriptor queue) {
             qd = queue;
         }
 
@@ -748,9 +744,9 @@
 
     private class QueueDeleteOperation extends OperationBase {
 
-        private QueueStore.QueueDescriptor qd;
+        private QueueDescriptor qd;
 
-        QueueDeleteOperation(QueueStore.QueueDescriptor queue) {
+        QueueDeleteOperation(QueueDescriptor queue) {
             qd = queue;
         }
 
@@ -771,9 +767,9 @@
 
     private class DeleteMessageOperation extends OperationBase {
         private final long storeTracking;
-        private QueueStore.QueueDescriptor queue;
+        private QueueDescriptor queue;
 
-        public DeleteMessageOperation(long tracking, QueueStore.QueueDescriptor queue) {
+        public DeleteMessageOperation(long tracking, QueueDescriptor queue) {
             this.storeTracking = tracking;
             this.queue = queue;
         }
@@ -808,7 +804,7 @@
     }
 
     private class RestoreMessageOperation extends OperationBase {
-        private QueueStore.QueueDescriptor queue;
+        private QueueDescriptor queue;
         private long firstKey;
         private int maxRecords;
         private long maxSequence;
@@ -816,7 +812,7 @@
         private RestoreListener<MessageDelivery> listener;
         private Collection<RestoredElement<MessageDelivery>> msgs = null;
 
-        RestoreMessageOperation(QueueStore.QueueDescriptor queue, boolean recordsOnly, long firstKey, int maxRecords, long maxSequence, RestoreListener<MessageDelivery> listener) {
+        RestoreMessageOperation(QueueDescriptor queue, boolean recordsOnly, long firstKey, int maxRecords, long maxSequence, RestoreListener<MessageDelivery> listener) {
             this.queue = queue;
             this.recordsOnly = recordsOnly;
             this.firstKey = firstKey;

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java?rev=780141&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java Fri May 29 23:45:24 2009
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public class QueueDescriptor {
+
+    public static final short SHARED = 0;
+    public static final short SHARED_PRIORITY = 1;
+    public static final short PARTITIONED = 2;
+    public static final short EXCLUSIVE = 4;
+    public static final short EXCLUSIVE_PRIORITY = 5;
+    
+    AsciiBuffer queueName;
+    AsciiBuffer parent;
+    int partitionKey;
+    short applicationType;
+    short queueType = SHARED;
+
+    public QueueDescriptor() {
+    }
+
+    public QueueDescriptor(QueueDescriptor toCopy) {
+        if (toCopy == null) {
+            return;
+        }
+        queueName = toCopy.queueName;
+        applicationType = toCopy.applicationType;
+        queueType = toCopy.queueType;
+        partitionKey = toCopy.partitionKey;
+        parent = toCopy.parent;
+    }
+
+    public QueueDescriptor copy() {
+        return new QueueDescriptor(this);
+    }
+
+    public int getPartitionKey() {
+        return partitionKey;
+    }
+
+    public void setPartitionId(int key) {
+        this.partitionKey = key;
+    }
+
+    /**
+     * Sets the queue type which is useful for querying of queues. The value
+     * must not be less than 0.
+     * 
+     * @param type
+     *            The type of the queue.
+     */
+    public void setApplicationType(short type) {
+        if (type < 0) {
+            throw new IllegalArgumentException();
+        }
+        applicationType = type;
+    }
+
+    /**
+     * @param type
+     *            The type of the queue.
+     */
+    public short getApplicationType() {
+        return applicationType;
+    }
+
+    public short getQueueType() {
+        return queueType;
+    }
+
+    public void setQueueType(short type) {
+        queueType = type;
+    }
+
+    /**
+     * If this queue is a partition of a parent queue, this should be set to
+     * the parent queue's name.
+     * 
+     * @return The parent queue's name
+     */
+    public AsciiBuffer getParent() {
+        return parent;
+    }
+
+    /**
+     * If this queue is a partition of a parent queue, this should be set to
+     * the parent queue's name.
+     */
+    public void setParent(AsciiBuffer parent) {
+        this.parent = parent;
+    }
+
+    public AsciiBuffer getQueueName() {
+        return queueName;
+    }
+
+    public void setQueueName(AsciiBuffer queueName) {
+        this.queueName = queueName;
+    }
+
+    public int hashCode() {
+        return queueName.hashCode();
+    }
+
+    public boolean equals(Object o) {
+        if (o == null) {
+            return false;
+        }
+        if (o == this) {
+            return true;
+        }
+
+        if (o instanceof QueueDescriptor) {
+            return equals((QueueDescriptor) o);
+        } else {
+            return false;
+        }
+    }
+
+    public boolean equals(QueueDescriptor qd) {
+        if (qd.queueName.equals(queueName)) {
+            return true;
+        }
+        return false;
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/RestoreListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/RestoreListener.java?rev=780141&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/RestoreListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/RestoreListener.java Fri May 29 23:45:24 2009
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import java.util.Collection;
+
+/**
+ * A callback used with the {@link BrokerDatabase#restoreMessages(QueueDescriptor, boolean, long, long, int, RestoreListener)} method.
+ */
+public interface RestoreListener<V> {
+
+    public void elementsRestored(Collection<RestoredElement<V>> restored);
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/RestoredElement.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/RestoredElement.java?rev=780141&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/RestoredElement.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/RestoredElement.java Fri May 29 23:45:24 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+/**
+ * A holder for queue elements loaded from the store.
+ * 
+ */
+public interface RestoredElement<V> {
+    /**
+     * @return Gets the restored element (possibly null if not requested)
+     * @throws Exception
+     */
+    public V getElement() throws Exception;
+
+    /**
+     * @return The element size.
+     */
+    int getElementSize();
+
+    /**
+     * @return A positive values indicating the expiration time if this
+     *         element is expirable.
+     */
+    long getExpiration();
+
+    /**
+     * Returns the sequence number of this element in the queue
+     * 
+     * @return the sequence number of this element
+     */
+    long getSequenceNumber();
+
+    /**
+     * Gets the tracking number of the stored message.
+     * 
+     * @return the next sequence number
+     */
+    long getStoreTracking();
+
+    /**
+     * Gets the next sequence number in the queue after this one or -1 if
+     * this is the last stored element
+     * 
+     * @return the next sequence number
+     */
+    long getNextSequenceNumber();
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/SaveableQueueElement.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/SaveableQueueElement.java?rev=780141&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/SaveableQueueElement.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/SaveableQueueElement.java Fri May 29 23:45:24 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+
+public interface SaveableQueueElement<V> {
+
+    /**
+     * @return the descriptor of the queue for which the element should be
+     *         saved.
+     */
+    public QueueDescriptor getQueueDescriptor();
+
+    /**
+     * @return the element to save.
+     */
+    public V getElement();
+
+    /**
+     * @return the sequence number of the element in the queue
+     * 
+     * 
+     */
+    public long getSequenceNumber();
+
+    /**
+     * @return a return value of true will cause {@link #notifySave()} to
+     *         called when this element is persisted
+     */
+    public boolean requestSaveNotify();
+
+    /**
+     * Called when the element has been saved.
+     */
+    public void notifySave();
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/Store.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/Store.java Fri May 29 23:45:24 2009
@@ -23,8 +23,6 @@
 import org.apache.activemq.Service;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.queue.QueueStore;
-import org.apache.activemq.queue.QueueStore.QueueDescriptor;
 
 /**
  * Interface to persistently store and access data needed by the messaging
@@ -286,7 +284,7 @@
         /**
          * @return the descriptor for the queue.
          */
-        public QueueStore.QueueDescriptor getDescriptor();
+        public QueueDescriptor getDescriptor();
 
         /**
          * Gets the count of elements in this queue. Note that this does not
@@ -379,7 +377,7 @@
 
         public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException;
 
-        public void transactionRemoveMessage(Buffer txid, QueueStore.QueueDescriptor queueName, Long messageKey) throws KeyNotFoundException;
+        public void transactionRemoveMessage(Buffer txid, QueueDescriptor queueName, Long messageKey) throws KeyNotFoundException;
 
         public void transactionCommit(Buffer txid) throws KeyNotFoundException;
 
@@ -396,7 +394,7 @@
          *            The maximum number of queues to return
          * @return The list of queues.
          */
-        public Iterator<QueueQueryResult> queueList(QueueStore.QueueDescriptor firstQueueName, int max);
+        public Iterator<QueueQueryResult> queueList(QueueDescriptor firstQueueName, int max);
 
         /**
          * Gets a list of queues for which
@@ -413,7 +411,7 @@
          *            The type of queue to consider
          * @return The list of queues.
          */
-        public Iterator<QueueQueryResult> queueListByType(short type, QueueStore.QueueDescriptor firstQueueName, int max);
+        public Iterator<QueueQueryResult> queueListByType(short type, QueueDescriptor firstQueueName, int max);
 
         /**
          * Adds a queue. If {@link QueueDescriptor#getParent()} is specified
@@ -425,7 +423,7 @@
          * @throws KeyNotFoundException
          *             if the descriptor specifies a non existent parent
          */
-        public void queueAdd(QueueStore.QueueDescriptor queue) throws KeyNotFoundException;
+        public void queueAdd(QueueDescriptor queue) throws KeyNotFoundException;
 
         /**
          * Deletes a queue and all of it's messages. If it has any child
@@ -434,7 +432,7 @@
          * @param queue
          *            The queue to delete
          */
-        public void queueRemove(QueueStore.QueueDescriptor queue);
+        public void queueRemove(QueueDescriptor queue);
 
         /**
          * Adds a reference to the message for the given queue. The associated
@@ -449,11 +447,11 @@
          *             If there is no message associated with
          *             {@link QueueRecord#getMessageKey()}
          */
-        public void queueAddMessage(QueueStore.QueueDescriptor queue, QueueRecord record) throws KeyNotFoundException;
+        public void queueAddMessage(QueueDescriptor queue, QueueRecord record) throws KeyNotFoundException;
 
-        public void queueRemoveMessage(QueueStore.QueueDescriptor queue, Long messageKey) throws KeyNotFoundException;
+        public void queueRemoveMessage(QueueDescriptor queue, Long messageKey) throws KeyNotFoundException;
 
-        public Iterator<QueueRecord> queueListMessagesQueue(QueueStore.QueueDescriptor queue, Long firstQueueKey, Long maxSequence, int max) throws KeyNotFoundException;
+        public Iterator<QueueRecord> queueListMessagesQueue(QueueDescriptor queue, Long firstQueueKey, Long maxSequence, int max) throws KeyNotFoundException;
 
         public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max);
 

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java Fri May 29 23:45:24 2009
@@ -27,11 +27,11 @@
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.Store.DuplicateKeyException;
 import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage;
-import org.apache.activemq.queue.QueueStore;
 import org.apache.kahadb.index.BTreeIndex;
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.Transaction;
@@ -83,7 +83,7 @@
     private BTreeIndex<Long, Long> trackingIndex;
 
     // Descriptor for this queue:
-    private QueueStore.QueueDescriptor descriptor;
+    private QueueDescriptor descriptor;
 
     // Child Partitions:
     private HashSet<DestinationEntity> partitions;
@@ -162,11 +162,11 @@
         return getMetaData(tx).count == 0 ? 0 : queueIndex.getLast(tx).getValue().getQueueKey();
     }
 
-    public void setQueueDescriptor(QueueStore.QueueDescriptor queue) {
+    public void setQueueDescriptor(QueueDescriptor queue) {
         descriptor = queue;
     }
 
-    public QueueStore.QueueDescriptor getDescriptor() {
+    public QueueDescriptor getDescriptor() {
         return descriptor;
     }
 

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Fri May 29 23:45:24 2009
@@ -30,6 +30,7 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
 import org.apache.activemq.broker.store.kahadb.Data.QueueAdd;
@@ -49,7 +50,6 @@
 import org.apache.activemq.protobuf.InvalidProtocolBufferException;
 import org.apache.activemq.protobuf.MessageBuffer;
 import org.apache.activemq.protobuf.PBMessage;
-import org.apache.activemq.queue.QueueStore;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.journal.Journal;
@@ -800,7 +800,7 @@
     }
 
     private void queueAdd(Transaction tx, QueueAdd command, Location location) throws IOException {
-        QueueStore.QueueDescriptor qd = new QueueStore.QueueDescriptor();
+        QueueDescriptor qd = new QueueDescriptor();
         qd.setQueueName(command.getQueueName());
         qd.setApplicationType((short) command.getApplicationType());
         qd.setQueueType((short) command.getQueueType());
@@ -813,13 +813,13 @@
     }
 
     private void queueRemove(Transaction tx, QueueRemove command, Location location) throws IOException {
-        QueueStore.QueueDescriptor qd = new QueueStore.QueueDescriptor();
+        QueueDescriptor qd = new QueueDescriptor();
         qd.setQueueName(command.getQueueName());
         rootEntity.queueRemove(tx, qd);
     }
 
     private void queueAddMessage(Transaction tx, QueueAddMessage command, Location location) throws IOException {
-        QueueStore.QueueDescriptor qd = new QueueStore.QueueDescriptor();
+        QueueDescriptor qd = new QueueDescriptor();
         qd.setQueueName(command.getQueueName());
         DestinationEntity destination = rootEntity.getDestination(qd);
         if (destination != null) {
@@ -835,7 +835,7 @@
     }
 
     private void queueRemoveMessage(Transaction tx, QueueRemoveMessage command, Location location) throws IOException {
-        QueueStore.QueueDescriptor qd = new QueueStore.QueueDescriptor();
+        QueueDescriptor qd = new QueueDescriptor();
         qd.setQueueName(command.getQueueName());
         DestinationEntity destination = rootEntity.getDestination(qd);
         if (destination != null) {
@@ -945,7 +945,7 @@
         // /////////////////////////////////////////////////////////////
         // Queue related methods.
         // /////////////////////////////////////////////////////////////
-        public void queueAdd(QueueStore.QueueDescriptor descriptor) {
+        public void queueAdd(QueueDescriptor descriptor) {
             QueueAddBean update = new QueueAddBean();
             update.setQueueName(descriptor.getQueueName());
             update.setQueueType(descriptor.getQueueType());
@@ -958,11 +958,11 @@
             updates.add(update);
         }
 
-        public void queueRemove(QueueStore.QueueDescriptor descriptor) {
+        public void queueRemove(QueueDescriptor descriptor) {
             updates.add(new QueueRemoveBean().setQueueName(descriptor.getQueueName()));
         }
 
-        public Iterator<QueueQueryResult> queueListByType(short type, QueueStore.QueueDescriptor firstQueue, int max) {
+        public Iterator<QueueQueryResult> queueListByType(short type, QueueDescriptor firstQueue, int max) {
             try {
                 return rootEntity.queueList(tx(), type, firstQueue, max);
             } catch (IOException e) {
@@ -970,7 +970,7 @@
             }
         }
 
-        public Iterator<QueueQueryResult> queueList(QueueStore.QueueDescriptor firstQueue, int max) {
+        public Iterator<QueueQueryResult> queueList(QueueDescriptor firstQueue, int max) {
             try {
                 return rootEntity.queueList(tx(), (short) -1, firstQueue, max);
             } catch (IOException e) {
@@ -978,7 +978,7 @@
             }
         }
 
-        public void queueAddMessage(QueueStore.QueueDescriptor queue, QueueRecord record) throws KeyNotFoundException {
+        public void queueAddMessage(QueueDescriptor queue, QueueRecord record) throws KeyNotFoundException {
             QueueAddMessageBean bean = new QueueAddMessageBean();
             bean.setQueueName(queue.getQueueName());
             bean.setQueueKey(record.getQueueKey());
@@ -990,14 +990,14 @@
             updates.add(bean);
         }
 
-        public void queueRemoveMessage(QueueStore.QueueDescriptor queue, Long messageKey) throws KeyNotFoundException {
+        public void queueRemoveMessage(QueueDescriptor queue, Long messageKey) throws KeyNotFoundException {
             QueueRemoveMessageBean bean = new QueueRemoveMessageBean();
             bean.setMessageKey(messageKey);
             bean.setQueueName(queue.getQueueName());
             updates.add(bean);
         }
 
-        public Iterator<QueueRecord> queueListMessagesQueue(QueueStore.QueueDescriptor queue, Long firstQueueKey, Long maxQueueKey, int max) throws KeyNotFoundException {
+        public Iterator<QueueRecord> queueListMessagesQueue(QueueDescriptor queue, Long firstQueueKey, Long maxQueueKey, int max) throws KeyNotFoundException {
             DestinationEntity destination = rootEntity.getDestination(queue);
             if (destination == null) {
                 throw new KeyNotFoundException("queue key: " + queue);
@@ -1077,7 +1077,7 @@
             return null;
         }
 
-        public void transactionRemoveMessage(Buffer txid, QueueStore.QueueDescriptor queueName, Long messageKey) throws KeyNotFoundException {
+        public void transactionRemoveMessage(Buffer txid, QueueDescriptor queueName, Long messageKey) throws KeyNotFoundException {
         }
 
         public void transactionRollback(Buffer txid) throws KeyNotFoundException {

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java Fri May 29 23:45:24 2009
@@ -20,10 +20,10 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.queue.QueueStore;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.util.Marshaller;
 import org.apache.kahadb.util.VariableMarshaller;
@@ -123,10 +123,10 @@
         }
     };
 
-    public final static Marshaller<QueueStore.QueueDescriptor> QUEUE_DESCRIPTOR_MARSHALLER = new VariableMarshaller<QueueStore.QueueDescriptor>() {
+    public final static Marshaller<QueueDescriptor> QUEUE_DESCRIPTOR_MARSHALLER = new VariableMarshaller<QueueDescriptor>() {
 
-        public QueueStore.QueueDescriptor readPayload(DataInput dataIn) throws IOException {
-            QueueStore.QueueDescriptor descriptor = new QueueStore.QueueDescriptor();
+        public QueueDescriptor readPayload(DataInput dataIn) throws IOException {
+            QueueDescriptor descriptor = new QueueDescriptor();
             descriptor.setQueueType(dataIn.readShort());
             descriptor.setApplicationType(dataIn.readShort());
             descriptor.setQueueName(ASCII_BUFFER_MARSHALLER.readPayload(dataIn));
@@ -137,7 +137,7 @@
             return descriptor;
         }
 
-        public void writePayload(QueueStore.QueueDescriptor object, DataOutput dataOut) throws IOException {
+        public void writePayload(QueueDescriptor object, DataOutput dataOut) throws IOException {
             dataOut.writeShort(object.getQueueType());
             dataOut.writeShort(object.getApplicationType());
             ASCII_BUFFER_MARSHALLER.writePayload(object.getQueueName(), dataOut);

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Fri May 29 23:45:24 2009
@@ -25,12 +25,12 @@
 import java.util.TreeMap;
 import java.util.Map.Entry;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.Store.KeyNotFoundException;
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
 import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
 import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.queue.QueueStore;
 import org.apache.kahadb.index.BTreeIndex;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Page;
@@ -174,7 +174,7 @@
      *             If the parent queue could not be found.
      */
     private void addToDestinationCache(DestinationEntity entity) throws KeyNotFoundException {
-        QueueStore.QueueDescriptor queue = entity.getDescriptor();
+        QueueDescriptor queue = entity.getDescriptor();
 
         // If loaded add a reference to us from the parent:
         if (loaded) {
@@ -191,7 +191,7 @@
     }
 
     private void removeFromDestinationCache(DestinationEntity entity) {
-        QueueStore.QueueDescriptor queue = entity.getDescriptor();
+        QueueDescriptor queue = entity.getDescriptor();
 
         // If the queue is loaded remove the parent reference:
         if (loaded) {
@@ -210,7 +210,7 @@
      */
     private void constructQueueHierarchy() throws KeyNotFoundException {
         for (DestinationEntity destination : destinations.values()) {
-            QueueStore.QueueDescriptor queue = destination.getDescriptor();
+            QueueDescriptor queue = destination.getDescriptor();
             if (queue.getParent() != null) {
                 DestinationEntity parent = destinations.get(queue.getParent());
                 if (parent == null) {
@@ -298,7 +298,7 @@
     // /////////////////////////////////////////////////////////////////
     // Queue Methods.
     // /////////////////////////////////////////////////////////////////
-    public void queueAdd(Transaction tx, QueueStore.QueueDescriptor queue) throws IOException {
+    public void queueAdd(Transaction tx, QueueDescriptor queue) throws IOException {
         if (destinationIndex.get(tx, queue.getQueueName()) == null) {
             DestinationEntity rc = new DestinationEntity();
             rc.setQueueDescriptor(queue);
@@ -313,7 +313,7 @@
         }
     }
 
-    public void queueRemove(Transaction tx, QueueStore.QueueDescriptor queue) throws IOException {
+    public void queueRemove(Transaction tx, QueueDescriptor queue) throws IOException {
         DestinationEntity destination = destinations.get(queue.getQueueName());
         if (destination != null) {
             // Remove the message references.
@@ -329,11 +329,11 @@
         }
     }
 
-    public DestinationEntity getDestination(QueueStore.QueueDescriptor queue) {
+    public DestinationEntity getDestination(QueueDescriptor queue) {
         return destinations.get(queue.getQueueName());
     }
 
-    public Iterator<QueueQueryResult> queueList(Transaction tx, short type, QueueStore.QueueDescriptor firstQueue, int max) throws IOException {
+    public Iterator<QueueQueryResult> queueList(Transaction tx, short type, QueueDescriptor firstQueue, int max) throws IOException {
         LinkedList<QueueQueryResult> results = new LinkedList<QueueQueryResult>();
         Collection<DestinationEntity> values = (firstQueue == null ? destinations.values() : destinations.tailMap(firstQueue.getQueueName()).values());
 
@@ -394,14 +394,14 @@
 
     private static class QueueQueryResultImpl implements QueueQueryResult {
 
-        QueueStore.QueueDescriptor desc;
+        QueueDescriptor desc;
         Collection<QueueQueryResult> partitions;
         long size;
         int count;
         long firstSequence;
         long lastSequence;
 
-        public QueueStore.QueueDescriptor getDescriptor() {
+        public QueueDescriptor getDescriptor() {
             return desc;
         }
 

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Fri May 29 23:45:24 2009
@@ -28,6 +28,7 @@
 
 import java.io.File;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
@@ -89,17 +90,17 @@
     }
 
     static private class StoredQueue {
-        QueueStore.QueueDescriptor descriptor;
+        QueueDescriptor descriptor;
 
         TreeMap<Long, QueueRecord> records = new TreeMap<Long, QueueRecord>(Comparators.LONG_COMPARATOR);
         // Maps tracking to sequence number:
         HashMap<Long, Long> trackingMap = new HashMap<Long, Long>();
         int count = 0;
         long size = 0;
-        HashMap<QueueStore.QueueDescriptor, StoredQueue> partitions;
+        HashMap<QueueDescriptor, StoredQueue> partitions;
         StoredQueue parent;
 
-        StoredQueue(QueueStore.QueueDescriptor descriptor) {
+        StoredQueue(QueueDescriptor descriptor) {
             this.descriptor = descriptor.copy();
         }
 
@@ -161,7 +162,7 @@
 
         public void addPartition(StoredQueue child) {
             if (partitions == null) {
-                partitions = new HashMap<QueueStore.QueueDescriptor, StoredQueue>();
+                partitions = new HashMap<QueueDescriptor, StoredQueue>();
             }
 
             partitions.put(child.getDescriptor(), child);
@@ -191,7 +192,7 @@
             return descriptor.getQueueName();
         }
 
-        public QueueStore.QueueDescriptor getDescriptor() {
+        public QueueDescriptor getDescriptor() {
             return descriptor;
         }
 
@@ -216,10 +217,10 @@
     }
 
     static private class RemoveOp {
-        QueueStore.QueueDescriptor queue;
+        QueueDescriptor queue;
         Long messageKey;
 
-        public RemoveOp(QueueStore.QueueDescriptor queue, Long messageKey) {
+        public RemoveOp(QueueDescriptor queue, Long messageKey) {
             this.queue = queue;
             this.messageKey = messageKey;
         }
@@ -245,7 +246,7 @@
             adds.add(messageKey);
         }
 
-        public void removeMessage(QueueStore.QueueDescriptor queue, Long messageKey) {
+        public void removeMessage(QueueDescriptor queue, Long messageKey) {
             removes.add(new RemoveOp(queue, messageKey));
         }
     }
@@ -261,14 +262,14 @@
 
     private static class QueueQueryResultImpl implements QueueQueryResult {
 
-        QueueStore.QueueDescriptor desc;
+        QueueDescriptor desc;
         Collection<QueueQueryResult> partitions;
         long size;
         int count;
         long firstSequence;
         long lastSequence;
 
-        public QueueStore.QueueDescriptor getDescriptor() {
+        public QueueDescriptor getDescriptor() {
             return desc;
         }
 
@@ -336,7 +337,7 @@
         // //////////////////////////////////////////////////////////////////////////////
         // Queue related methods.
         // ///////////////////////////////////////////////////////////////////////////////
-        public void queueAdd(QueueStore.QueueDescriptor desc) throws KeyNotFoundException {
+        public void queueAdd(QueueDescriptor desc) throws KeyNotFoundException {
             StoredQueue queue = queues.get(desc.getQueueName());
             if (queue == null) {
                 queue = new StoredQueue(desc);
@@ -359,7 +360,7 @@
             }
         }
 
-        public void queueRemove(QueueStore.QueueDescriptor desc) {
+        public void queueRemove(QueueDescriptor desc) {
             StoredQueue queue = queues.get(desc.getQueueName());
             if (queue != null) {
                 // Remove message references:
@@ -377,7 +378,7 @@
                 Iterator<StoredQueue> partitions = queue.getPartitions();
                 if (partitions != null) {
                     while (partitions.hasNext()) {
-                        QueueStore.QueueDescriptor child = partitions.next().getDescriptor();
+                        QueueDescriptor child = partitions.next().getDescriptor();
                         queueRemove(child);
                     }
                 }
@@ -388,15 +389,15 @@
         }
 
         // Queue related methods.
-        public Iterator<QueueQueryResult> queueListByType(short type, QueueStore.QueueDescriptor firstQueue, int max) {
+        public Iterator<QueueQueryResult> queueListByType(short type, QueueDescriptor firstQueue, int max) {
             return queueListInternal(firstQueue, type, max);
         }
 
-        public Iterator<QueueQueryResult> queueList(QueueStore.QueueDescriptor firstQueue, int max) {
+        public Iterator<QueueQueryResult> queueList(QueueDescriptor firstQueue, int max) {
             return queueListInternal(firstQueue, (short) -1, max);
         }
 
-        private Iterator<QueueQueryResult> queueListInternal(QueueStore.QueueDescriptor firstQueue, short type, int max) {
+        private Iterator<QueueQueryResult> queueListInternal(QueueDescriptor firstQueue, short type, int max) {
             Collection<StoredQueue> tailResults;
             LinkedList<QueueQueryResult> results = new LinkedList<QueueQueryResult>();
             if (firstQueue == null) {
@@ -418,7 +419,7 @@
             return results.iterator();
         }
 
-        public void queueAddMessage(QueueStore.QueueDescriptor queue, QueueRecord record) throws KeyNotFoundException {
+        public void queueAddMessage(QueueDescriptor queue, QueueRecord record) throws KeyNotFoundException {
             get(queues, queue.getQueueName()).add(record);
             MessageRecordHolder holder = messages.get(record.getMessageKey());
             if (holder != null) {
@@ -426,7 +427,7 @@
             }
         }
 
-        public void queueRemoveMessage(QueueStore.QueueDescriptor queue, Long msgKey) throws KeyNotFoundException {
+        public void queueRemoveMessage(QueueDescriptor queue, Long msgKey) throws KeyNotFoundException {
             if (get(queues, queue.getQueueName()).remove(msgKey)) {
                 deleteMessageReference(msgKey);
             }
@@ -442,7 +443,7 @@
             }
         }
 
-        public Iterator<QueueRecord> queueListMessagesQueue(QueueStore.QueueDescriptor queue, Long firstQueueKey, Long maxQueueKey, int max) throws KeyNotFoundException {
+        public Iterator<QueueRecord> queueListMessagesQueue(QueueDescriptor queue, Long firstQueueKey, Long maxQueueKey, int max) throws KeyNotFoundException {
             return get(queues, queue.getQueueName()).list(firstQueueKey, maxQueueKey, max);
         }
 
@@ -534,7 +535,7 @@
             }
         }
 
-        public void transactionRemoveMessage(Buffer txid, QueueStore.QueueDescriptor queue, Long messageKey) throws KeyNotFoundException {
+        public void transactionRemoveMessage(Buffer txid, QueueDescriptor queue, Long messageKey) throws KeyNotFoundException {
             get(transactions, txid).removeMessage(queue, messageKey);
             MessageRecordHolder holder = messages.get(messageKey);
             if (holder != null) {

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/CursoredQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/CursoredQueue.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/CursoredQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/CursoredQueue.java Fri May 29 23:45:24 2009
@@ -24,14 +24,14 @@
 import java.util.LinkedList;
 import java.util.Map.Entry;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
+import org.apache.activemq.broker.store.RestoreListener;
+import org.apache.activemq.broker.store.RestoredElement;
+import org.apache.activemq.broker.store.SaveableQueueElement;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.QueueStore.QueueDescriptor;
-import org.apache.activemq.queue.QueueStore.RestoreListener;
-import org.apache.activemq.queue.QueueStore.RestoredElement;
-import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
 import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
 import org.apache.activemq.util.Comparators;
 import org.apache.activemq.util.Mapper;
@@ -1315,7 +1315,7 @@
      */
     private class ElementLoader implements RestoreListener<V> {
 
-        private LinkedList<QueueStore.RestoredElement<V>> fromDatabase = new LinkedList<QueueStore.RestoredElement<V>>();
+        private LinkedList<RestoredElement<V>> fromDatabase = new LinkedList<RestoredElement<V>>();
         private final HashMap<Long, HashSet<Cursor<V>>> reservedBlocks = new HashMap<Long, HashSet<Cursor<V>>>();
         private final HashSet<Cursor<V>> pagingCursors = new HashSet<Cursor<V>>();
 

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java Fri May 29 23:45:24 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.queue;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowResource;
@@ -27,7 +28,6 @@
 import org.apache.activemq.queue.CursoredQueue.Cursor;
 import org.apache.activemq.queue.CursoredQueue.QueueElement;
 import org.apache.activemq.queue.QueueStore.PersistentQueue;
-import org.apache.activemq.queue.QueueStore.QueueDescriptor;
 import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
 import org.apache.activemq.util.Mapper;
 
@@ -57,7 +57,7 @@
      */
     public ExclusivePersistentQueue(String name, IFlowSizeLimiter<E> limiter) {
         super(name);
-        this.queueDescriptor = new QueueStore.QueueDescriptor();
+        this.queueDescriptor = new QueueDescriptor();
         this.limiter = limiter;
         queueDescriptor.setQueueName(new AsciiBuffer(super.getResourceName()));
         queueDescriptor.setQueueType(QueueDescriptor.EXCLUSIVE);

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Fri May 29 23:45:24 2009
@@ -20,12 +20,12 @@
 import java.util.HashMap;
 import java.util.HashSet;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.queue.QueueStore.QueueDescriptor;
 import org.apache.activemq.util.Mapper;
 
 abstract public class PartitionedQueue<K, V> extends AbstractLimitedFlowResource<V> implements IPartitionedQueue<K, V> {
@@ -37,17 +37,17 @@
     protected IDispatcher dispatcher;
     private boolean started;
     private boolean shutdown = false;
-    protected QueueStore.QueueDescriptor queueDescriptor;
+    protected QueueDescriptor queueDescriptor;
     private int basePriority = 0;
 
     public PartitionedQueue(String name) {
         super(name);
-        queueDescriptor = new QueueStore.QueueDescriptor();
+        queueDescriptor = new QueueDescriptor();
         queueDescriptor.setQueueName(new AsciiBuffer(getResourceName()));
         queueDescriptor.setQueueType(QueueDescriptor.PARTITIONED);
     }
 
-    public QueueStore.QueueDescriptor getDescriptor() {
+    public QueueDescriptor getDescriptor() {
         return queueDescriptor;
     }
 

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/QueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/QueueStore.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/QueueStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/QueueStore.java Fri May 29 23:45:24 2009
@@ -16,225 +16,15 @@
  */
 package org.apache.activemq.queue;
 
-import java.util.Collection;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
+import org.apache.activemq.broker.store.RestoreListener;
+import org.apache.activemq.broker.store.SaveableQueueElement;
 import org.apache.activemq.broker.store.BrokerDatabase.OperationContext;
 import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.protobuf.AsciiBuffer;
 
 public interface QueueStore<K, V> {
 
-    public interface SaveableQueueElement<V> {
-
-        /**
-         * @return the descriptor of the queue for which the element should be
-         *         saved.
-         */
-        public QueueDescriptor getQueueDescriptor();
-
-        /**
-         * @return the element to save.
-         */
-        public V getElement();
-
-        /**
-         * @return the sequence number of the element in the queue
-         * 
-         * 
-         */
-        public long getSequenceNumber();
-
-        /**
-         * @return a return value of true will cause {@link #notifySave()} to
-         *         called when this element is persisted
-         */
-        public boolean requestSaveNotify();
-
-        /**
-         * Called when the element has been saved.
-         */
-        public void notifySave();
-
-    }
-
-    /**
-     * A holder for queue elements loaded from the store.
-     * 
-     */
-    public interface RestoredElement<V> {
-        /**
-         * @return Gets the restored element (possibly null if not requested)
-         * @throws Exception
-         */
-        public V getElement() throws Exception;
-
-        /**
-         * @return The element size.
-         */
-        int getElementSize();
-
-        /**
-         * @return A positive values indicating the expiration time if this
-         *         element is expirable.
-         */
-        long getExpiration();
-
-        /**
-         * Returns the sequence number of this element in the queue
-         * 
-         * @return the sequence number of this element
-         */
-        long getSequenceNumber();
-
-        /**
-         * Gets the tracking number of the stored message.
-         * 
-         * @return the next sequence number
-         */
-        long getStoreTracking();
-
-        /**
-         * Gets the next sequence number in the queue after this one or -1 if
-         * this is the last stored element
-         * 
-         * @return the next sequence number
-         */
-        long getNextSequenceNumber();
-    }
-
-    /**
-     * A callback to be used with {@link #elementsRestored(Collection)} to pass
-     * the results of a call to
-     * {@link QueueStore#restoreQueueElements(QueueDescriptor, long, long, int, RestoreListener)}
-     */
-    public interface RestoreListener<V> {
-
-        public void elementsRestored(Collection<RestoredElement<V>> restored);
-    }
-
-    public static class QueueDescriptor {
-
-        public static final short SHARED = 0;
-        public static final short SHARED_PRIORITY = 1;
-        public static final short PARTITIONED = 2;
-        public static final short EXCLUSIVE = 4;
-        public static final short EXCLUSIVE_PRIORITY = 5;
-        
-        AsciiBuffer queueName;
-        AsciiBuffer parent;
-        int partitionKey;
-        short applicationType;
-        short queueType = SHARED;
-
-        public QueueDescriptor() {
-        }
-
-        public QueueDescriptor(QueueDescriptor toCopy) {
-            if (toCopy == null) {
-                return;
-            }
-            queueName = toCopy.queueName;
-            applicationType = toCopy.applicationType;
-            queueType = toCopy.queueType;
-            partitionKey = toCopy.partitionKey;
-            parent = toCopy.parent;
-        }
-
-        public QueueDescriptor copy() {
-            return new QueueDescriptor(this);
-        }
-
-        public int getPartitionKey() {
-            return partitionKey;
-        }
-
-        public void setPartitionId(int key) {
-            this.partitionKey = key;
-        }
-
-        /**
-         * Sets the queue type which is useful for querying of queues. The value
-         * must not be less than 0.
-         * 
-         * @param type
-         *            The type of the queue.
-         */
-        public void setApplicationType(short type) {
-            if (type < 0) {
-                throw new IllegalArgumentException();
-            }
-            applicationType = type;
-        }
-
-        /**
-         * @param type
-         *            The type of the queue.
-         */
-        public short getApplicationType() {
-            return applicationType;
-        }
-
-        public short getQueueType() {
-            return queueType;
-        }
-
-        public void setQueueType(short type) {
-            queueType = type;
-        }
-
-        /**
-         * If this queue is a partition of a parent queue, this should be set to
-         * the parent queue's name.
-         * 
-         * @return The parent queue's name
-         */
-        public AsciiBuffer getParent() {
-            return parent;
-        }
-
-        /**
-         * If this queue is a partition of a parent queue, this should be set to
-         * the parent queue's name.
-         */
-        public void setParent(AsciiBuffer parent) {
-            this.parent = parent;
-        }
-
-        public AsciiBuffer getQueueName() {
-            return queueName;
-        }
-
-        public void setQueueName(AsciiBuffer queueName) {
-            this.queueName = queueName;
-        }
-
-        public int hashCode() {
-            return queueName.hashCode();
-        }
-
-        public boolean equals(Object o) {
-            if (o == null) {
-                return false;
-            }
-            if (o == this) {
-                return true;
-            }
-
-            if (o instanceof QueueDescriptor) {
-                return equals((QueueDescriptor) o);
-            } else {
-                return false;
-            }
-        }
-
-        public boolean equals(QueueDescriptor qd) {
-            if (qd.queueName.equals(queueName)) {
-                return true;
-            }
-            return false;
-        }
-    }
-    
     public interface PersistentQueue<K, V>
     {
         /**
@@ -276,7 +66,7 @@
          * 
          * @return The queue descriptor.
          */
-        public QueueStore.QueueDescriptor getDescriptor();
+        public QueueDescriptor getDescriptor();
 
     }
 

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java Fri May 29 23:45:24 2009
@@ -20,12 +20,12 @@
 import java.util.HashMap;
 import java.util.HashSet;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.PrioritySizeLimiter;
 import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.queue.QueueStore.QueueDescriptor;
 import org.apache.activemq.util.Mapper;
 
 public class SharedPriorityQueue<K, V> extends AbstractLimitedFlowResource<V> implements IPartitionedQueue<K, V> {
@@ -40,14 +40,14 @@
     private QueueStore<K, V> store;
     private PersistencePolicy<V> persistencePolicy;
     private boolean started;
-    private QueueStore.QueueDescriptor queueDescriptor;
+    private QueueDescriptor queueDescriptor;
     private Mapper<Long, V> expirationMapper;
     private int basePriority = 0;
     private boolean shutdown = false;
 
     public SharedPriorityQueue(String name, PrioritySizeLimiter<V> limiter) {
         super(name);
-        queueDescriptor = new QueueStore.QueueDescriptor();
+        queueDescriptor = new QueueDescriptor();
         queueDescriptor.setQueueName(new AsciiBuffer(super.getResourceName()));
         queueDescriptor.setQueueType(QueueDescriptor.SHARED_PRIORITY);
         this.limiter = limiter;
@@ -234,7 +234,7 @@
         }
     }
 
-    public QueueStore.QueueDescriptor getDescriptor() {
+    public QueueDescriptor getDescriptor() {
         return queueDescriptor;
     }
 

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/SharedQueue.java Fri May 29 23:45:24 2009
@@ -18,6 +18,7 @@
 
 import java.util.HashMap;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowController;
@@ -30,7 +31,6 @@
 import org.apache.activemq.queue.CursoredQueue.Cursor;
 import org.apache.activemq.queue.CursoredQueue.CursorReadyListener;
 import org.apache.activemq.queue.CursoredQueue.QueueElement;
-import org.apache.activemq.queue.QueueStore.QueueDescriptor;
 import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
 import org.apache.activemq.util.Mapper;
 import org.apache.kahadb.util.LinkedNode;
@@ -57,7 +57,7 @@
     private FlowController<V> inputController;
     private final IFlowSizeLimiter<V> sizeLimiter;
 
-    private final QueueStore.QueueDescriptor queueDescriptor;
+    private final QueueDescriptor queueDescriptor;
 
     private static final int ACCEPTED = 0;
     private static final int NO_MATCH = 1;
@@ -98,7 +98,7 @@
         this.mutex = mutex == null ? new Object() : mutex;
 
         flow = new Flow(getResourceName(), false);
-        queueDescriptor = new QueueStore.QueueDescriptor();
+        queueDescriptor = new QueueDescriptor();
         queueDescriptor.setQueueName(new AsciiBuffer(super.getResourceName()));
         queueDescriptor.setQueueType(QueueDescriptor.SHARED);
         this.sizeLimiter = sizeLimiter;
@@ -191,7 +191,7 @@
         }
     }
 
-    public QueueStore.QueueDescriptor getDescriptor() {
+    public QueueDescriptor getDescriptor() {
         return queueDescriptor;
     }
 

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/SharedQueueOld.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/SharedQueueOld.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/SharedQueueOld.java Fri May 29 23:45:24 2009
@@ -23,6 +23,7 @@
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowResource;
@@ -106,7 +107,7 @@
 
     };
 
-    private QueueStore.QueueDescriptor queueDescriptor;
+    private QueueDescriptor queueDescriptor;
 
     public SharedQueueOld(String name, IFlowSizeLimiter<V> limiter) {
         this(name, limiter, new Object());
@@ -123,7 +124,7 @@
      */
     public SharedQueueOld(String name, IFlowSizeLimiter<V> limiter, Object mutex) {
         super(name);
-        queueDescriptor = new QueueStore.QueueDescriptor();
+        queueDescriptor = new QueueDescriptor();
         queueDescriptor.setQueueName(new AsciiBuffer(super.getResourceName()));
         this.mutex = mutex;
         Flow flow = new Flow(name, false);
@@ -266,7 +267,7 @@
         return accepted ? null : matches;
     }
 
-    public QueueStore.QueueDescriptor getDescriptor() {
+    public QueueDescriptor getDescriptor() {
         return queueDescriptor;
     }
 

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java Fri May 29 23:45:24 2009
@@ -31,7 +31,6 @@
 import org.apache.activemq.metric.Period;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.queue.QueueStore;
 
 public abstract class StorePerformanceBase extends TestCase {
 
@@ -40,7 +39,7 @@
     
     
     private Store store;
-    private QueueStore.QueueDescriptor queueId;
+    private QueueDescriptor queueId;
 
     protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
     protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
@@ -55,7 +54,7 @@
         store = createStore();
         store.start();
         
-        queueId = new QueueStore.QueueDescriptor();
+        queueId = new QueueDescriptor();
         queueId.setQueueName(new AsciiBuffer("test"));
         store.execute(new VoidCallback<Exception>() {
             @Override

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java Fri May 29 23:45:24 2009
@@ -29,7 +29,6 @@
 import org.apache.activemq.broker.store.Store.VoidCallback;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.queue.QueueStore;
 
 public abstract class StoreTestBase extends TestCase {
 
@@ -78,7 +77,7 @@
     }
 
     public void testQueueAdd() throws Exception {
-        final QueueStore.QueueDescriptor expected = new QueueStore.QueueDescriptor();
+        final QueueDescriptor expected = new QueueDescriptor();
         expected.setQueueName(new AsciiBuffer("testQueue"));
         expected.setApplicationType((short)1);
         
@@ -105,7 +104,7 @@
     }
     
     public void testQueueMessageAdd() throws Exception {
-        final QueueStore.QueueDescriptor queue = new QueueStore.QueueDescriptor();
+        final QueueDescriptor queue = new QueueDescriptor();
         queue.setQueueName(new AsciiBuffer("testQueue"));
         queue.setApplicationType((short)1);
         
@@ -146,7 +145,7 @@
         }
     }
 
-    private void checkQueue(final QueueStore.QueueDescriptor queue, final long expectedSize, final long expectedCount) throws FatalStoreException, Exception
+    private void checkQueue(final QueueDescriptor queue, final long expectedSize, final long expectedCount) throws FatalStoreException, Exception
     {
         store.execute(new VoidCallback<Exception>() {
             @Override
@@ -161,7 +160,7 @@
         }, null);
     }
     
-    private void checkMessageRestore(final QueueStore.QueueDescriptor queue, final QueueRecord qRecord, final MessageRecord message ) throws FatalStoreException, Exception
+    private void checkMessageRestore(final QueueDescriptor queue, final QueueRecord qRecord, final MessageRecord message ) throws FatalStoreException, Exception
     {
         store.execute(new VoidCallback<Exception>() {
             @Override
@@ -182,7 +181,7 @@
             store.execute(new VoidCallback<Exception>() {
                 @Override
                 public void run(Session session) throws Exception {
-                    QueueStore.QueueDescriptor qd = new QueueStore.QueueDescriptor();
+                    QueueDescriptor qd = new QueueDescriptor();
                     qd.setQueueName(new AsciiBuffer("test"));
                     session.queueAdd(qd);
                     throw new IOException("Expected");
@@ -214,7 +213,7 @@
         assertEquals(expected.getSize(), actual.getSize());
     }
     
-    static void assertEquals(QueueStore.QueueDescriptor expected, QueueStore.QueueDescriptor actual) {
+    static void assertEquals(QueueDescriptor expected, QueueDescriptor actual) {
         assertEquals(expected.getParent(), actual.getParent());
         assertEquals(expected.getQueueType(), actual.getQueueType());
         assertEquals(expected.getApplicationType(), actual.getApplicationType());

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/flow/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=780141&r1=780140&r2=780141&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/flow/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/flow/MockQueue.java Fri May 29 23:45:24 2009
@@ -5,6 +5,9 @@
 
 import java.util.HashMap;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
+import org.apache.activemq.broker.store.RestoreListener;
+import org.apache.activemq.broker.store.SaveableQueueElement;
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.MockBroker.DeliveryTarget;
 import org.apache.activemq.queue.IQueue;
@@ -197,7 +200,7 @@
 
         }
 
-        public final void deleteQueueElement(QueueStore.QueueDescriptor descriptor, Message elem) {
+        public final void deleteQueueElement(QueueDescriptor descriptor, Message elem) {
 
         }
 
@@ -209,15 +212,15 @@
             // Noop;
         }
 
-        public final void restoreQueueElements(QueueStore.QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount, QueueStore.RestoreListener<Message> listener) {
+        public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<Message> listener) {
             throw new UnsupportedOperationException("Mock broker doesn't support persistence");
         }
 
-        public final void addQueue(QueueStore.QueueDescriptor queue) {
+        public final void addQueue(QueueDescriptor queue) {
 
         }
 
-        public final void deleteQueue(QueueStore.QueueDescriptor queue) {
+        public final void deleteQueue(QueueDescriptor queue) {
 
         }
 



Mime
View raw message