activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r636615 [1/2] - in /activemq/sandbox/activemq-router/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/router/api/ main/java/org/apache/activemq/broker/router/core/ main/java/org/apache/activemq/broker/router/c...
Date Thu, 13 Mar 2008 03:16:23 GMT
Author: chirino
Date: Wed Mar 12 20:16:19 2008
New Revision: 636615

URL: http://svn.apache.org/viewvc?rev=636615&view=rev
Log:
Several fixes... added queue browser support and initial pass at temp destinations

Added:
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BrowserStoreSubscription.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BrowserSubscription.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexBasicTestSupport.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexThreadingTestSupport.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaBasicIndexTest.java
      - copied, changed from r635708, activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaIndexTest.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaThreadingIndexTest.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreBasicTestSupport.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreThreadingTestSupport.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalBasicStoreTest.java
      - copied, changed from r635708, activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalStoreTest.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalThreadingStoreTest.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreBasicTest.java
      - copied, changed from r635708, activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreTest.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreThreadingTest.java
Removed:
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaIndexTest.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalStoreTest.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreTest.java
Modified:
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterRegionBroker.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/ClientSubscription.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/Destination.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/StoreSubscription.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastDestination.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastStoreSubscription.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/MultiDestinationClientSubscription.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/QualityOfService.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/Router.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/SimpleDestinationManager.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/TransportClientConnection.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BroadcastQueue.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/QueueSubscription.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndex.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/ReferenceIndex.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/JpaDataIndex.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/JpaReferenceIndex.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/DataStore.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/ReferenceStore.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStore.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStoreManager.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalReferenceStore.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStore.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryReferenceStore.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/api/StubDestination.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexTestSupport.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/performance/QueueTests.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreTestSupport.java

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterRegionBroker.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterRegionBroker.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterRegionBroker.java Wed Mar 12 20:16:19 2008
@@ -246,7 +246,7 @@
         requestContext.connectionContext = producerExchange.getConnectionContext();
 
         for (Destination destination : destinations) {
-            destination.enqueue(requestContext, message, completionHandler);
+            destination.add(requestContext, message, completionHandler);
         }
     }
 

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/ClientSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/ClientSubscription.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/ClientSubscription.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/ClientSubscription.java Wed Mar 12 20:16:19 2008
@@ -51,7 +51,13 @@
 
     public void addStoreSubscription(StoreSubscription destination);
 
-    public void removeStoreSubscription(StoreSubscription destination);
+    /**
+     * 
+     * @param destination
+     * @return any messages that have not been acknowledged by the consumer.  Use to replay the messages
+     *         to other consumers.
+     */
+    public List<Long> removeStoreSubscription(StoreSubscription destination);
 
     public void wakeup();
 

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/Destination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/Destination.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/Destination.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/Destination.java Wed Mar 12 20:16:19 2008
@@ -41,7 +41,7 @@
 
     public ActiveMQDestination getName();
 
-    public void enqueue(RequestContext requestContext, Message message, Runnable onStored) throws Exception;
+    public void add(RequestContext requestContext, Message message, Runnable onStored) throws Exception;
 
     public boolean lockForDispatch(ClientSubscription source, CacheEntry ref);
 

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/StoreSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/StoreSubscription.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/StoreSubscription.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/StoreSubscription.java Wed Mar 12 20:16:19 2008
@@ -16,11 +16,15 @@
  */
 package org.apache.activemq.broker.router.api;
 
+import org.apache.activemq.broker.router.store.api.DataStore;
+
 /**
  * 
  * @author chirino
  */
 public interface StoreSubscription {
     public void wakeup();
+
+    public DataStore getDataStore();
 
 }

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastDestination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastDestination.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastDestination.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastDestination.java Wed Mar 12 20:16:19 2008
@@ -77,7 +77,7 @@
         return systemUsage;
     }
 
-    public void enqueue(RequestContext ctx, Message message, Runnable onStored) throws Exception {
+    public void add(RequestContext ctx, Message message, Runnable onStored) throws Exception {
         while (!systemUsage.getMemoryUsage().waitForSpace(100)) {
             if (ctx.connectionContext.getStopping().get()) {
                 throw new IOException("Connection closed, enqueue aborted.");
@@ -85,7 +85,7 @@
         }
         enqueueCounter.incrementAndGet();
         QualityOfService qos = chooseQosFor(message);
-        qos.enqueue(ctx, message, onStored);
+        qos.add(ctx, message, onStored);
     }
 
     public void addSubscription(ClientSubscription subscription) throws Exception {

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastStoreSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastStoreSubscription.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastStoreSubscription.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastStoreSubscription.java Wed Mar 12 20:16:19 2008
@@ -44,30 +44,30 @@
 
     final private Log log;
 
-    private final int maxPrefetch;
-    private final DataStore dataStore;
-    private final ClientSubscription clientSubscription;
-    private final ReferenceStore store;
-    private final LinkedList<CacheEntry> prefetch;
+    protected final int maxPrefetch;
+    protected final DataStore dataStore;
+    protected final ClientSubscription clientSubscription;
+    protected final ReferenceStore referenceStore;
+    protected final LinkedList<CacheEntry> prefetch = new LinkedList<CacheEntry>();
+    protected final LinkedList<CacheEntry> redeliveries = new LinkedList<CacheEntry>();
 
     // Used to track recovery of the subscription...
-    private boolean recoveringFromDataStore;
-    private CacheEntry lastRecover;
-    private CacheEntry recoverPosition;
-    private CacheEntry storePosition;
-
-    private final ReadWriteLock rwl = new ReentrantReadWriteLock();
-    private boolean enqueueToStore;
-    private AtomicBoolean dequeueFromStore = new AtomicBoolean();
+    protected boolean recoveringFromDataStore;
+    protected CacheEntry lastRecover;
+    protected CacheEntry recoverPosition;
+    protected CacheEntry storePosition;
+
+    protected final ReadWriteLock rwl = new ReentrantReadWriteLock();
+    protected boolean enqueueToStore;
+    protected AtomicBoolean dequeueFromStore = new AtomicBoolean();
 
-    private Selectable<Runnable> storePumpTask;
+    protected Selectable<Runnable> storePumpTask;
 
-    public BroadcastStoreSubscription(DataStore dataStore, ReferenceStore store, ClientSubscription clientSubscription) {
+    public BroadcastStoreSubscription(DataStore dataStore, ReferenceStore referenceStore, ClientSubscription clientSubscription) {
         this.dataStore = dataStore;
-        this.store = store;
+        this.referenceStore = referenceStore;
         this.clientSubscription = clientSubscription;
         this.maxPrefetch = Math.max(1, clientSubscription.getInfo().getPrefetchSize());
-        this.prefetch = new LinkedList<CacheEntry>();
         this.log = LogFactory.getLog(BroadcastStoreSubscription.class.getName() + ".consumer=" + clientSubscription.getInfo().getConsumerId());
     }
 
@@ -75,8 +75,8 @@
         return clientSubscription;
     }
 
-    public ReferenceStore getStore() {
-        return store;
+    public ReferenceStore getReferenceStore() {
+        return referenceStore;
     }
 
     public DataStore getDataStore() {
@@ -104,14 +104,47 @@
     public void stop() throws Exception {
         storePumpTask.close();
     }
+    
+    
+    public void redeliver(CacheEntry cacheEntry) throws Exception {
+        rwl.readLock().lock();
+        try {
+            if (enqueueToStore) {
+                synchronized(redeliveries) {
+                    cacheEntry.load();
+                    redeliveries.add(cacheEntry);
+                }
+            } else {
+                if (!clientSubscription.offer(this, cacheEntry)) {
+                    cacheEntry.load();
+                    redeliveries.add(cacheEntry);
+                    rwl.readLock().unlock();
+                    rwl.writeLock().lock();
+                    try {
+                        if (!enqueueToStore) {
+                            log.debug("Next is full..  spooling to store");
+                            enqueueToStore = true;
+                            storePumpTask.setEnabled(true);
+                        }
+                    } finally {
+                        rwl.readLock().lock();
+                        rwl.writeLock().unlock();
+                    }
+                }
+            }
+        } finally {
+            rwl.readLock().unlock();
+        }
+    }
 
-    public boolean offer(CacheEntry cacheEntry) throws Exception {
+    public void add(CacheEntry cacheEntry) throws Exception {
         rwl.readLock().lock();
         try {
             if (enqueueToStore) {
-                store.addReference(cacheEntry);
+                referenceStore.addReference(cacheEntry);
             } else {
                 if (!clientSubscription.offer(this, cacheEntry)) {
+                    referenceStore.addReference(cacheEntry);
                     rwl.readLock().unlock();
                     rwl.writeLock().lock();
                     try {
@@ -124,14 +157,11 @@
                         rwl.readLock().lock();
                         rwl.writeLock().unlock();
                     }
-                    store.addReference(cacheEntry);
-                    // wakeup();
                 }
             }
         } finally {
             rwl.readLock().unlock();
         }
-        return true;
     }
 
     /**
@@ -149,7 +179,7 @@
         for (;;) {
             if (prefetch.isEmpty()) {
                 long start = System.currentTimeMillis();
-                load(prefetch, maxPrefetch - prefetch.size());
+                load(prefetch, maxPrefetch);
                 long end = System.currentTimeMillis();
                 log.debug("Loaded Prefetch: " + prefetch.size() + " msgs in " + (end - start) + " msec");
             }
@@ -158,7 +188,7 @@
                 // try to enqueue directly to the next guy.
                 rwl.writeLock().lock();
                 try {
-                    load(prefetch, maxPrefetch - prefetch.size());
+                    load(prefetch, maxPrefetch);
                     if (prefetch.isEmpty()) {
                         log.debug("store is empty.. will enqueue directly");
                         enqueueToStore = false;
@@ -219,23 +249,45 @@
      * @return
      * @throws Exception
      */
-    public void load(LinkedList<CacheEntry> target, int max) throws Exception {
-        int counter = 0;
+    protected void load(LinkedList<CacheEntry> target, int max) throws Exception {
+
+        // Load from the re-delivery list first..        
+        loadFromRedeliveries(target, max);        
 
         // We have to keep looping because the subscription might not
         // be interested in all the messages we initially load.
-        while (recoveringFromDataStore && counter < max) {
+        loadFromDataStore(target, max);
 
-            // This also loads the message into memory.
+        loadFromReferenceStore(target, max);
+    }
+
+    protected void loadFromReferenceStore(LinkedList<CacheEntry> target, int max) throws Exception {
+        if ( target.size() < max ) {
+            long start = System.currentTimeMillis();
+            int c = max - target.size();
+            log.debug("Loading up to " + c + " msgs");
+            List<CacheEntry> data = referenceStore.remove(storePosition, null, c);
+            long end = System.currentTimeMillis();
+            log.debug("Loaded: " + data.size() + " msgs in " + (end - start) + " msec");
+            if (data.size() > 0) {
+                storePosition = data.get(data.size() - 1);
+                target.addAll(data);
+            }
+        }
+    }
+
+    protected void loadFromDataStore(LinkedList<CacheEntry> target, int max) throws Exception {
+        while (recoveringFromDataStore && target.size() < max) {
 
+            // This also loads the message into memory.
             long start = System.currentTimeMillis();
-            int c = max - counter;
+            int c = max - target.size();
             log.debug("Loading up to " + c + " msgs");
             List<CacheEntry> data = dataStore.load(recoverPosition, lastRecover, c);
             long end = System.currentTimeMillis();
             log.debug("Loaded: " + data.size() + " msgs in " + (end - start) + " msec");
 
-            if (data.size() < max - counter) {
+            if (data.size() < max - target.size()) {
                 recoveringFromDataStore = false;
             }
 
@@ -256,7 +308,6 @@
                     mec.setDestination(name);
                     mec.setMessageReference(message);
                     if (message != null && clientSubscription.matches(message, mec)) {
-                        counter++;
                         target.add(mr);
                     } else {
                         // If it did not match.. then unload it.
@@ -269,25 +320,21 @@
                 }
             }
         }
+    }
 
-        if (counter < max) {
-            long start = System.currentTimeMillis();
-            int c = max - counter;
-            log.debug("Loading up to " + c + " msgs");
-            List<CacheEntry> data = store.remove(storePosition, null, c);
-            long end = System.currentTimeMillis();
-            log.debug("Loaded: " + data.size() + " msgs in " + (end - start) + " msec");
-            if (data.size() > 0) {
-                storePosition = data.get(data.size() - 1);
-                counter += data.size();
-                target.addAll(data);
+    protected void loadFromRedeliveries(LinkedList<CacheEntry> target, int max) {
+        synchronized(redeliveries) {
+            while ( !redeliveries.isEmpty() && target.size() < max ) {
+                target.add(redeliveries.removeFirst());
             }
         }
     }
 
     public void recoverUntil(CacheEntry until) throws Exception {
         recoveringFromDataStore = true;
-        this.recoverPosition = store.getLastAddedEntry();
+        if( referenceStore!=null ) {
+            this.recoverPosition = referenceStore.getLastAddedEntry();
+        }
         this.lastRecover = until;
         this.enqueueToStore = true;
     }

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/MultiDestinationClientSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/MultiDestinationClientSubscription.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/MultiDestinationClientSubscription.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/MultiDestinationClientSubscription.java Wed Mar 12 20:16:19 2008
@@ -103,13 +103,15 @@
         }
     }
 
-    public void removeStoreSubscription(StoreSubscription destination) {
+    public List<Long> removeStoreSubscription(StoreSubscription destination) {
+        ArrayList<Long> rc = new ArrayList<Long>(0);
         synchronized (prefetchWindowMutex) {
             // We access stores via copy on write since writes are infrequent
             // but need to do async reads of it.
             stores = new ArrayList<StoreSubscription>(stores);
             stores.remove(destination);
-        }
+        }        
+        return rc;
     }
 
     public void setClientConnection(ClientConnection clientConnection) {
@@ -162,14 +164,15 @@
 
     public void start() throws Exception {
         synchronized (prefetchWindowMutex) {
-            started = true;
             router.addSubscription(this);
-            if (!stores.isEmpty()) {
-                for (StoreSubscription store : stores) {
-                    store.wakeup();
-                }
-            }
+//            if (!stores.isEmpty()) {
+//                for (StoreSubscription store : stores) {
+//                    store.wakeup();
+//                }
+//            }
+            started = true;
         }
+        wakeup();
     }
 
     public void stop() throws Exception {

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/QualityOfService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/QualityOfService.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/QualityOfService.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/QualityOfService.java Wed Mar 12 20:16:19 2008
@@ -23,6 +23,8 @@
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.router.api.ClientSubscription;
 import org.apache.activemq.broker.router.api.RequestContext;
+import org.apache.activemq.broker.router.api.StoreSubscription;
+import org.apache.activemq.broker.router.core.queue.BrowserStoreSubscription;
 import org.apache.activemq.broker.router.store.api.CacheEntry;
 import org.apache.activemq.broker.router.store.api.DataStore;
 import org.apache.activemq.broker.router.store.api.ReferenceStore;
@@ -40,8 +42,9 @@
     private DataStore dataStore;
     public long lastId = 0;
     public CacheEntry lastAddedMessage;
-
-    public void enqueue(RequestContext ctx, Message message, Runnable onStored) throws Exception {
+    public boolean redeliver;
+    
+    public void add(RequestContext ctx, Message message, Runnable onStored) throws Exception {
 
         CacheEntry reference = null;
         LinkedList<BroadcastStoreSubscription> subs;
@@ -63,7 +66,7 @@
                 // dispatch the message or put it on the pending message
                 // store.
                 if (subscription.matches(message, ec)) {
-                    subscription.offer(reference);
+                    subscription.add(reference);
                 }
             }
         } catch (Exception e) {
@@ -76,36 +79,100 @@
         }
 
     }
-
-    public void addSubscription(ClientSubscription subscription) throws Exception {
+    
+    
+    /** 
+     * Queue subscriptions implement this to re-deliver messages to other consumers.
+     * @param messageIds
+     * @throws Exception 
+     */
+    protected void redeliver(List<Long> messageIds) throws Exception {
+                
+        LinkedList<BroadcastStoreSubscription> subs;        
         synchronized (this) {
-            subscriptions = new LinkedList<BroadcastStoreSubscription>(subscriptions);
-            ReferenceStore store = dataStore.addStore(TRANSIENT_PREFIX + subscription.getInfo().getConsumerId());
-            BroadcastStoreSubscription ss = new BroadcastStoreSubscription(dataStore, store, subscription);
-            if (lastAddedMessage != null) {
-                ss.recoverUntil(lastAddedMessage);
+            subs = subscriptions;
+        }
+        
+        List<CacheEntry> references = dataStore.load(messageIds);            
+        for (CacheEntry reference : references) {                
+            Message message = reference.getMessage();
+            if( message == null ) {
+                continue;
             }
-            subscriptions.add(ss);
-            subscription.addStoreSubscription(ss);
-            ss.start();
-            subscription.wakeup();
+            
+            try {
+                
+                // Wake up the subscriptions so that they push messages out to the
+                // clients.
+                MessageEvaluationContext ec = new MessageEvaluationContext();
+                ec.setDestination(dataStore.getDestination().getName());
+                ec.setMessageReference(message);
+
+                for (BroadcastStoreSubscription subscription : subs) {
+                    // Lock the subscription state so that we can properly either
+                    // dispatch the message or put it on the pending message
+                    // store.
+                    try {
+                        if (subscription.matches(message, ec)) {
+                            subscription.redeliver(reference);
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+                
+            } finally {
+                reference.unload();
+            }                
+        }
+            
+    }
+
+    public void addSubscription(ClientSubscription clientSubscription) throws Exception {
+        synchronized (this) {            
+            BroadcastStoreSubscription storeSubscription;
+            if( clientSubscription.getInfo().isBrowser() ) {
+                if (lastAddedMessage == null) {
+                    // No messages have been added yet.  No need to browse this guy
+                    return;
+                }            
+                storeSubscription = new BrowserStoreSubscription(dataStore, null, clientSubscription);
+            } else {
+                ReferenceStore store = dataStore.addStore(TRANSIENT_PREFIX + clientSubscription.getInfo().getConsumerId());
+                storeSubscription = new BroadcastStoreSubscription(dataStore, store, clientSubscription);
+                subscriptions = new LinkedList<BroadcastStoreSubscription>(subscriptions);
+                subscriptions.add(storeSubscription);
+            }            
+            if (lastAddedMessage != null) {
+                storeSubscription.recoverUntil(lastAddedMessage);
+            }            
+            clientSubscription.addStoreSubscription(storeSubscription);
+            storeSubscription.start();
+            clientSubscription.wakeup();
         }
     }
 
     public void removeSubscription(ClientSubscription subscription) throws Exception {
+        List<Long> messages=null;
         synchronized (this) {
-            subscriptions = new LinkedList<BroadcastStoreSubscription>(subscriptions);
-            for (Iterator<BroadcastStoreSubscription> iterator = subscriptions.iterator(); iterator.hasNext();) {
-                BroadcastStoreSubscription ss = iterator.next();
-                if (ss.getClientSubscription() == subscription) {
-                    ss.getDataStore().removeStore(ss.getStore());
-                    iterator.remove();
-                    break;
+            if( !subscription.getInfo().isBrowser() ) {
+                subscriptions = new LinkedList<BroadcastStoreSubscription>(subscriptions);
+                for (Iterator<BroadcastStoreSubscription> iterator = subscriptions.iterator(); iterator.hasNext();) {
+                    BroadcastStoreSubscription ss = iterator.next();
+                    if (ss.getClientSubscription() == subscription) {
+                        messages = subscription.removeStoreSubscription(ss);
+                        iterator.remove();
+                        ss.getDataStore().removeStore(ss.getReferenceStore());
+                        break;
+                    }
                 }
             }
         }
+        if( redeliver && messages!=null  ) {
+            redeliver(messages);
+        }
     }
-
+    
     public DataStore getDataStore() {
         return dataStore;
     }

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/Router.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/Router.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/Router.java Wed Mar 12 20:16:19 2008
@@ -26,6 +26,7 @@
 import org.apache.activemq.broker.router.api.DestinationManager;
 import org.apache.activemq.broker.router.core.SimpleDestinationManager.DestinationManagerCallback;
 import org.apache.activemq.broker.router.core.queue.BroadcastQueue;
+import org.apache.activemq.broker.router.core.queue.BrowserSubscription;
 import org.apache.activemq.broker.router.core.queue.QueueSubscription;
 import org.apache.activemq.broker.router.core.topic.Topic;
 import org.apache.activemq.broker.router.core.topic.TopicSubscription;
@@ -34,6 +35,8 @@
 import org.apache.activemq.broker.router.util.SelectorThreadPool;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
@@ -99,6 +102,15 @@
     public Topic createTopic(ActiveMQTopic name) {
         return new Topic(this, name);
     }
+    
+    public Destination createTempQueue(ActiveMQTempQueue name) {
+        return new BroadcastQueue(this, name);
+    }
+
+    public Destination createTempTopic(ActiveMQTempTopic name) {
+        return new Topic(this, name);
+    }
+
 
     /**
      * Matches up existing subscriptions with this destination.
@@ -153,7 +165,11 @@
         ClientSubscription rc = null;
         ActiveMQDestination name = consumerInfo.getDestination();
         if (name.isQueue()) {
-            rc = new QueueSubscription(this, consumerInfo);
+            if( consumerInfo.isBrowser() ) {
+                rc = new BrowserSubscription(this, consumerInfo);
+            } else {
+                rc = new QueueSubscription(this, consumerInfo);
+            }
         } else {
             rc = new TopicSubscription(this, consumerInfo);
         }

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/SimpleDestinationManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/SimpleDestinationManager.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/SimpleDestinationManager.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/SimpleDestinationManager.java Wed Mar 12 20:16:19 2008
@@ -27,6 +27,8 @@
 import org.apache.activemq.broker.router.api.DestinationManager;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
 import org.apache.activemq.command.ActiveMQTopic;
 
 /**
@@ -41,13 +43,15 @@
     private DestinationManagerCallback destinationManagerCallback;
 
     public interface DestinationManagerCallback {
-        Destination createQueue(ActiveMQQueue name);
 
+        Destination createQueue(ActiveMQQueue name);
         Destination createTopic(ActiveMQTopic name);
+        Destination createTempQueue(ActiveMQTempQueue name);
+        Destination createTempTopic(ActiveMQTempTopic name);
 
         void addDestination(Destination destination) throws Exception;
-
         void removeDestination(Destination destination, boolean force) throws Exception;
+
     }
 
     public SimpleDestinationManager() {
@@ -99,6 +103,13 @@
         case ActiveMQDestination.TOPIC_TYPE:
             destination = destinationManagerCallback.createTopic((ActiveMQTopic) name);
             break;
+        case ActiveMQDestination.TEMP_QUEUE_TYPE:
+            destination = destinationManagerCallback.createTempQueue((ActiveMQTempQueue) name);
+            break;
+        case ActiveMQDestination.TEMP_TOPIC_TYPE:
+            destination = destinationManagerCallback.createTempTopic((ActiveMQTempTopic) name);
+            break;
+            
         default:
             throw new IllegalStateException("Invalid destination type: " + type);
         }

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/TransportClientConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/TransportClientConnection.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/TransportClientConnection.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/TransportClientConnection.java Wed Mar 12 20:16:19 2008
@@ -209,7 +209,7 @@
         }
 
         for (Destination destination : destinations) {
-            destination.enqueue(requestContext, message, completionHandler);
+            destination.add(requestContext, message, completionHandler);
         }
     }
 

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BroadcastQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BroadcastQueue.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BroadcastQueue.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BroadcastQueue.java Wed Mar 12 20:16:19 2008
@@ -54,6 +54,9 @@
     }
 
     public boolean lockForDispatch(ClientSubscription source, CacheEntry ref) {
+        if( source.getInfo().isBrowser() ) {
+            return true;
+        }
         return ref.lock();
     }
 

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BrowserStoreSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BrowserStoreSubscription.java?rev=636615&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BrowserStoreSubscription.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BrowserStoreSubscription.java Wed Mar 12 20:16:19 2008
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.core.queue;
+
+import java.util.LinkedList;
+
+import org.apache.activemq.broker.router.api.ClientSubscription;
+import org.apache.activemq.broker.router.core.BroadcastStoreSubscription;
+import org.apache.activemq.broker.router.store.api.CacheEntry;
+import org.apache.activemq.broker.router.store.api.DataStore;
+import org.apache.activemq.broker.router.store.api.ReferenceStore;
+
+/**
+ * 
+ * @author chirino
+ */
+public class BrowserStoreSubscription extends BroadcastStoreSubscription {
+
+    private boolean browseTerminated;
+
+    public BrowserStoreSubscription(DataStore dataStore, ReferenceStore referenceStore, ClientSubscription clientSubscription) {
+        super(dataStore, referenceStore, clientSubscription);
+    }
+
+    @Override
+    protected void load(LinkedList<CacheEntry> target, int max) throws Exception {
+        // Only load from the datastore.. a browser does not get in flight messages or redeliveries.
+        loadFromDataStore(target, max);        
+    }
+
+    public boolean isBrowseTerminated() {
+        return browseTerminated;
+    }    
+    
+    @Override
+    public void pump() throws Exception {
+        super.pump();        
+        if( prefetch.isEmpty() && !browseTerminated ) {
+            browseTerminated=true;
+            clientSubscription.removeStoreSubscription(this);
+        }
+    }
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BrowserSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BrowserSubscription.java?rev=636615&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BrowserSubscription.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BrowserSubscription.java Wed Mar 12 20:16:19 2008
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.core.queue;
+
+import java.util.List;
+
+import org.apache.activemq.broker.router.api.StoreSubscription;
+import org.apache.activemq.broker.router.core.Router;
+import org.apache.activemq.broker.router.core.topic.TopicSubscription;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageDispatch;
+
+
+/**
+ * 
+ * @author chirino
+ *
+ */
+public class BrowserSubscription extends TopicSubscription {
+    boolean atEndOfBrowse;
+    public BrowserSubscription(Router router, ConsumerInfo info) throws Exception {
+        super(router, info);
+    }
+    
+    @Override
+    public List<Long> removeStoreSubscription(StoreSubscription destination) {
+        List<Long> rc = super.removeStoreSubscription(destination);
+        synchronized(prefetchWindowMutex) {
+            if( started &&  stores.isEmpty() && !atEndOfBrowse ) {
+                atEndOfBrowse=true;
+                dispatchEndOfBrowse();
+            }
+        }
+        return rc;
+    }
+
+    @Override
+    public void start() throws Exception {
+        super.start();
+        synchronized(prefetchWindowMutex) {
+            if( stores.isEmpty() && !atEndOfBrowse ) {
+                atEndOfBrowse=true;
+                dispatchEndOfBrowse();
+            }
+        }
+    }
+
+    private void dispatchEndOfBrowse() {
+
+        final MessageDispatch md = new MessageDispatch();
+        md.setConsumerId(info.getConsumerId());
+        md.setDestination(info.getDestination());
+        md.setMessage(null);
+        md.setTransmitCallback(new Runnable() {
+            public void run() {
+            }
+        });
+        
+        dispatch(null, md);
+        
+    }
+}

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/QueueSubscription.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/QueueSubscription.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/QueueSubscription.java Wed Mar 12 20:16:19 2008
@@ -24,6 +24,7 @@
 import javax.jms.JMSException;
 
 import org.apache.activemq.broker.router.api.RequestContext;
+import org.apache.activemq.broker.router.api.StoreSubscription;
 import org.apache.activemq.broker.router.core.MultiDestinationClientSubscription;
 import org.apache.activemq.broker.router.core.Router;
 import org.apache.activemq.broker.router.store.api.CacheEntry;
@@ -76,6 +77,21 @@
         }
     }
 
+    
+    @Override
+    public List<Long> removeStoreSubscription(StoreSubscription destination) {
+        super.removeStoreSubscription(destination);
+        synchronized (clientPrefetch) {
+            ArrayList<Long> rc = new ArrayList<Long>(clientPrefetch.size());
+            for (PrefetchItem pi : clientPrefetch) {
+                if( pi.dataStore == destination.getDataStore() ) {
+                    rc.add(pi.storeId);
+                }
+            }
+            return rc;
+        }
+    }
+    
     protected void standardAck(final RequestContext context, final MessageAck ack) throws Exception {
         boolean inAckRange = false;
         boolean callDispatchMatched = false;

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndex.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndex.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndex.java Wed Mar 12 20:16:19 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.router.index.api;
 
+import java.util.List;
+
 import org.apache.activemq.kaha.impl.async.Location;
 
 /**
@@ -61,5 +63,7 @@
     public void setAutoRemove(boolean enable);
 
     public boolean contains(long id) throws Exception;
+
+    public List<IndexEntry> load(List<Long> messageIds) throws Exception;
 
 }

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/ReferenceIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/ReferenceIndex.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/ReferenceIndex.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/ReferenceIndex.java Wed Mar 12 20:16:19 2008
@@ -34,4 +34,6 @@
 
     public List<IndexEntry> remove(IndexEntry first, IndexEntry last, int max) throws Exception;
 
+    public DataIndex getDataIndex();
+
 }

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/JpaDataIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/JpaDataIndex.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/JpaDataIndex.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/JpaDataIndex.java Wed Mar 12 20:16:19 2008
@@ -275,6 +275,56 @@
 
         });
     }
+    
+    public List<IndexEntry> load(final List<Long> messageIds) throws Exception {
+        
+        List<IndexRecord> records = executeUnTransacted(new EntityManagerAction<List<IndexRecord>>() {
+            @SuppressWarnings("unchecked")
+            public List<IndexRecord> execute(EntityManager em) {
+                
+                StringBuffer q =  new StringBuffer("SELECT r FROM IndexRecord r WHERE r.store=?1 AND r.id IN (");
+                int c=2;
+                for (Long id : messageIds) {
+                    if( c!= 2) {
+                        q.append(",");
+                    }
+                    q.append("?");
+                    q.append(c);
+                    c++;                    
+                }
+                q.append(")");
+                
+                Query query = em.createQuery(q.toString());
+                query.setParameter(1, storeMetadata.getId());
+                c = 2;
+                for (Long id : messageIds) {
+                    query.setParameter(c, id);
+                    c++;                    
+                }
+
+                return query.getResultList();
+            }
+
+        });
+
+        // I don't think we can assume that the SELECT will gives us the records in the original order
+        // of the messageIds list.. lets put 'em in a map so we can pick 'em out in the right order.
+        HashMap<Long, IndexRecord> map = new HashMap<Long, IndexRecord>();
+        for (IndexRecord r : records) {
+            r.setIndex(JpaDataIndex.this);
+            map.put(r.getId(), r);
+        }
+        
+        ArrayList<IndexEntry> rc = new ArrayList<IndexEntry>(records.size());
+        for (Long id : messageIds) {
+            IndexRecord r = map.get(id);
+            if( r!=null ) {
+                rc.add(r);
+            }
+        }
+        return rc;
+    }
+
 
     // /////////////////////////////////////////////////////////////////
     // Implementation Methods

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/JpaReferenceIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/JpaReferenceIndex.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/JpaReferenceIndex.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/JpaReferenceIndex.java Wed Mar 12 20:16:19 2008
@@ -22,6 +22,7 @@
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
 
+import org.apache.activemq.broker.router.index.api.DataIndex;
 import org.apache.activemq.broker.router.index.api.IndexEntry;
 import org.apache.activemq.broker.router.index.api.ReferenceIndex;
 import org.apache.activemq.broker.router.index.jpa.JpaDataIndexManager.EntityManagerAction;
@@ -115,7 +116,8 @@
 
     @SuppressWarnings("unchecked")
     public List<IndexEntry> remove(IndexEntry firstIE, IndexEntry lastIE, final int max) throws Exception {
-        final ArrayList<IndexEntry> rc = new ArrayList<IndexEntry>();
+        
+        final ArrayList<IndexEntry> rc = new ArrayList<IndexEntry>(max);
         while (rc.size() < max) {
             final List<IndexEntry> l = load(firstIE, lastIE, max);
             int skipped = executeTransacted(new EntityManagerAction<Integer>() {
@@ -131,6 +133,8 @@
                         query.setParameter(1, rr.getRowId());
                         if (query.executeUpdate() != 1) {
                             skipped++;
+                        } else {
+                            rc.add(ie);
                         }
                     }
                     return skipped;
@@ -215,6 +219,10 @@
             query.setParameter(c, last.getRowId());
         }
         return query;
+    }
+
+    public DataIndex getDataIndex() {
+        return dataIndex;
     }
 
 }

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/DataStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/DataStore.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/DataStore.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/DataStore.java Wed Mar 12 20:16:19 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.router.store.api;
 
+import java.util.List;
+
 import org.apache.activemq.broker.router.api.Destination;
 import org.apache.activemq.broker.router.store.api.DataStoreManager.TransactionAction;
 import org.apache.activemq.command.Message;
@@ -109,4 +111,6 @@
      * @param enable
      */
     public void setAutoRemove(boolean enable);
+
+    public List<CacheEntry> load(List<Long> messageIds) throws Exception;
 }

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/ReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/ReferenceStore.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/ReferenceStore.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/ReferenceStore.java Wed Mar 12 20:16:19 2008
@@ -59,5 +59,7 @@
     public RemoveReferenceTransactionAction removeReferenceTransacted(long id, TransactionId tx, Runnable onCompleted) throws Exception;
 
     public List<CacheEntry> remove(CacheEntry first, CacheEntry last, int max) throws Exception;
+    
+    DataStore getDataStore();
 
 }

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStore.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStore.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStore.java Wed Mar 12 20:16:19 2008
@@ -175,10 +175,6 @@
 
             assert message != null : "Once locked the message should not be null.";
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("LOCKED: " + indexEntry.getId());
-            }
-
             return locked = true;
         }
 
@@ -281,6 +277,17 @@
 
         return rc;
     }
+    
+    public List<CacheEntry> load(List<Long> messageIds) throws Exception {
+        List<IndexEntry> load = index.load(messageIds);
+        ArrayList<CacheEntry> rc = new ArrayList<CacheEntry>(load.size());
+        for (IndexEntry ie : load) {
+            CacheEntry cacheEntry = load(ie);
+            rc.add(cacheEntry);
+        }
+        return rc;
+    }
+
 
     public void remove(long id, Runnable onCompleted) throws Exception {
         journal.removeData(getName(), id, null, onCompleted);

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStoreManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStoreManager.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStoreManager.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStoreManager.java Wed Mar 12 20:16:19 2008
@@ -58,6 +58,7 @@
  */
 public class JournalDataStoreManager implements DataStoreManager {
 
+    private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
     private static final Log LOG = LogFactory.getLog(JournalDataStoreManager.class);
 
     private final JournalStoreManager<DataStore> stores = new JournalStoreManager<DataStore>() {
@@ -93,6 +94,7 @@
     private Map<TransactionId, List<StoreTxJournalEntry>> inFlightTransactions = new HashMap<TransactionId, List<StoreTxJournalEntry>>();
     private Timer cleanup = new Timer();
     long cleanupInterval = 1000 * 60;
+    private boolean failIfJournalIsLocked;
 
     public void start() throws Exception {
 
@@ -106,6 +108,23 @@
             throw new IllegalStateException("The dataIndexManager property must be specified.");
         }
 
+        if (failIfJournalIsLocked) {
+            asyncDataManager.lock();
+        } else {
+            while (true) {
+                try {
+                    asyncDataManager.lock();
+                    break;
+                } catch (IOException e) {
+                    LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.");
+                    try {
+                        Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
+                    } catch (InterruptedException e1) {
+                    }
+                }
+            }
+        }
+
         LOG.info("Journal starting");
         asyncDataManager.start();
         dataIndexManager.start();
@@ -447,6 +466,14 @@
 
     public void setCleanupInterval(long cleanupInterval) {
         this.cleanupInterval = cleanupInterval;
+    }
+
+    public boolean isFailIfJournalIsLocked() {
+        return failIfJournalIsLocked;
+    }
+
+    public void setFailIfJournalIsLocked(boolean failIfJournalIsLocked) {
+        this.failIfJournalIsLocked = failIfJournalIsLocked;
     }
 
 }

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalReferenceStore.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalReferenceStore.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalReferenceStore.java Wed Mar 12 20:16:19 2008
@@ -201,4 +201,8 @@
     public ReferenceIndex getIndex() {
         return index;
     }
+
+    public DataStore getDataStore() {
+        return dataStore;
+    }
 }

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStore.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStore.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStore.java Wed Mar 12 20:16:19 2008
@@ -285,6 +285,19 @@
         }
         return rc;
     }
+    
+    public List<CacheEntry> load(List<Long> messageIds) throws Exception {
+        
+        ArrayList<CacheEntry> rc = new ArrayList<CacheEntry>(messageIds.size());
+        for (Long id : messageIds) {
+            MemoryCacheEntry mce = cache.get(id);
+            if( mce!=null ) {
+                rc.add(mce);
+            }
+        }
+        return rc;        
+    }
+
 
     public void remove(long id, Runnable onCompleted) throws Exception {
         MemoryCacheEntry record = cache.remove(id);

Modified: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryReferenceStore.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryReferenceStore.java (original)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryReferenceStore.java Wed Mar 12 20:16:19 2008
@@ -239,4 +239,8 @@
         };
     }
 
+    public DataStore getDataStore() {
+        return dataStore;
+    }
+
 }

Modified: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/api/StubDestination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/api/StubDestination.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/api/StubDestination.java (original)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/api/StubDestination.java Wed Mar 12 20:16:19 2008
@@ -36,7 +36,7 @@
     public void dequeue(RequestContext context, MessageAck ack, DataStore dataStore, long storeId) throws Exception {
     }
 
-    public void enqueue(RequestContext requestContext, Message message, Runnable onStored) throws Exception {
+    public void add(RequestContext requestContext, Message message, Runnable onStored) throws Exception {
     }
 
     public AtomicInteger getConsumerCounter() {

Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexBasicTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexBasicTestSupport.java?rev=636615&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexBasicTestSupport.java (added)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexBasicTestSupport.java Wed Mar 12 20:16:19 2008
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.index;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.broker.router.index.api.DataIndex;
+import org.apache.activemq.broker.router.index.api.IndexEntry;
+import org.apache.activemq.broker.router.index.api.ReferenceIndex;
+
+abstract public class IndexBasicTestSupport extends IndexTestSupport {
+
+    public void testCreateDestroyDataIndex() throws Exception {
+        String indexName = getName();
+
+        assertNull(dataIndexManager.getStore(indexName));
+        List<DataIndex> indexes = dataIndexManager.getStores();
+        assertTrue(indexes.isEmpty());
+
+        DataIndex dataIndex = dataIndexManager.addStore(indexName);
+        assertNotNull(dataIndex);
+
+        assertSame(dataIndex, dataIndexManager.getStore(indexName));
+        indexes = dataIndexManager.getStores();
+        assertEquals(1, indexes.size());
+        assertTrue(indexes.contains(dataIndex));
+
+        // Verify that the data index create was persisted between restart.
+        restartDataIndexManager();
+
+        dataIndex = dataIndexManager.getStore(indexName);
+        assertNotNull(dataIndex);
+        assertEquals(indexName, dataIndex.getName());
+        indexes = dataIndexManager.getStores();
+        assertEquals(1, indexes.size());
+        assertTrue(indexes.contains(dataIndex));
+
+        // Verify that the data index remove was persisted between restart.
+        dataIndexManager.removeStore(dataIndex);
+        restartDataIndexManager();
+
+        dataIndex = dataIndexManager.getStore(indexName);
+        assertNull(dataIndex);
+        indexes = dataIndexManager.getStores();
+        assertTrue(indexes.isEmpty());
+    }
+
+    public void testCreateDestroyReferenceIndex() throws Exception {
+
+        String dataName = getName();
+        String refName = getName() + "-ref";
+        DataIndex manager = dataIndexManager.addStore(dataName);
+
+        assertNull(manager.getStore(refName));
+        List<ReferenceIndex> indexes = manager.getStores();
+        assertTrue(indexes.isEmpty());
+
+        ReferenceIndex dataIndex = manager.addStore(refName);
+        assertNotNull(dataIndex);
+
+        assertSame(dataIndex, manager.getStore(refName));
+        indexes = manager.getStores();
+        assertEquals(1, indexes.size());
+        assertTrue(indexes.contains(dataIndex));
+
+        // Verify that the data index create was persisted between restart.
+        restartDataIndexManager();
+        manager = dataIndexManager.getStore(dataName);
+
+        dataIndex = manager.getStore(refName);
+        assertNotNull(dataIndex);
+        assertEquals(refName, dataIndex.getName());
+        indexes = manager.getStores();
+        assertEquals(1, indexes.size());
+        assertTrue(indexes.contains(dataIndex));
+
+        // Verify that the data index remove was persisted between restart.
+        manager.removeStore(dataIndex);
+        restartDataIndexManager();
+        manager = dataIndexManager.getStore(dataName);
+
+        dataIndex = manager.getStore(refName);
+        assertNull(dataIndex);
+        indexes = manager.getStores();
+        assertTrue(indexes.isEmpty());
+
+    }
+
+    /**
+     * Verify that index properties can be stored without any issues.
+     * 
+     * @throws Exception
+     */
+    public void testIndexProperties() throws Exception {
+
+        String indexName = getName();
+
+        // Make a relatively large property object..
+        Map<String, String> properties = new HashMap<String, String>();
+        for (int i = 0; i < 1000; i++) {
+            properties.put("key" + i, "value" + i);
+        }
+
+        DataIndex dataIndex = dataIndexManager.addStore(indexName);
+        dataIndex.setProperties(properties);
+        assertEquals(properties, dataIndex.getProperties());
+
+        // Restart and verify the the properties were preserved.
+        restartDataIndexManager();
+
+        dataIndex = dataIndexManager.getStore(indexName);
+        assertEquals(properties, dataIndex.getProperties());
+
+        dataIndex.setProperties(null);
+
+        String refName = indexName + "-ref";
+        ReferenceIndex refIndex = dataIndex.addStore(refName);
+        refIndex.setProperties(properties);
+
+        restartDataIndexManager();
+
+        dataIndex = dataIndexManager.getStore(indexName);
+        assertNull(dataIndex.getProperties());
+        refIndex = dataIndex.getStore(refName);
+
+        assertEquals(properties, refIndex.getProperties());
+
+    }
+
+    /**
+     * Verify that the create retrieve and delete operations work properly
+     * against a DataIndex.
+     * 
+     * @throws Exception
+     */
+    public void testDataIndexCRD() throws Exception {
+        String indexName = getName();
+        DataIndex dataIndex = dataIndexManager.addStore(indexName);
+
+        int count = 10;
+        IndexEntry ie[] = new IndexEntry[count];
+        for (int i = 0; i < count; i++) {
+            ie[i] = dataIndex.addMessage(new Long(i), createLocation(0, i));
+            assertNotNull(ie[i]);
+            assertNotNull(ie[i].getIndex());
+            assertNotNull(ie[i].getId());
+            assertNotNull(ie[i].getLocation());
+        }
+
+        restartDataIndexManager();
+        dataIndex = dataIndexManager.getStore(indexName);
+
+        assertEquals(count, dataIndex.size());
+
+        List<IndexEntry> loaded = dataIndex.load(null, null, count * 2);
+        assertEquals(count, loaded.size());
+
+        // Verify that all the entries were loaded in the order inserted.
+        int i = 0;
+        for (IndexEntry entry : loaded) {
+            assertNotNull(entry);
+            assertNotNull(entry.getIndex());
+            assertEquals(ie[i], entry);
+            i++;
+        }
+
+        // Lets delete every other record.
+        int deleteCount = 0;
+        i = 0;
+        for (IndexEntry entry : loaded) {
+            if (i % 2 == 0) {
+                dataIndex.remove(entry.getId());
+                deleteCount++;
+            }
+            i++;
+        }
+
+        // Restart and verify that the right records were removed.
+        restartDataIndexManager();
+        dataIndex = dataIndexManager.getStore(indexName);
+
+        assertEquals(count - deleteCount, dataIndex.size());
+
+        loaded = dataIndex.load(null, null, count * 2);
+        assertEquals(count - deleteCount, loaded.size());
+
+        Iterator<IndexEntry> iterator = loaded.iterator();
+        for (int j = 0; j < count; j++) {
+            if (!(i % 2 == 0)) {
+                IndexEntry entry = iterator.next();
+                assertEquals(ie[j], entry);
+            }
+        }
+
+        // Verify that removing a store wipes out his data.
+        dataIndexManager.removeStore(dataIndex);
+        dataIndex = dataIndexManager.addStore(indexName);
+        assertEquals(0, dataIndex.size());
+
+    }
+
+    /**
+     * Verify that the create retrieve and delete operations work properly
+     * against a ReferenceIndex.
+     * 
+     * @throws Exception
+     */
+    public void testReferenceIndexCreateLoadDelete() throws Exception {
+        String indexName = getName();
+        String refName = getName() + "-ref";
+        DataIndex dataIndex = dataIndexManager.addStore(indexName);
+        ReferenceIndex refIndex = dataIndex.addStore(refName);
+
+        int count = 10;
+        IndexEntry ie[] = new IndexEntry[count];
+        for (int i = 0; i < count; i++) {
+            ie[i] = dataIndex.addMessage(i, createLocation(0, i));
+            assertNotNull(ie[i]);
+            assertNotNull(ie[i].getIndex());
+            assertNotNull(ie[i].getId());
+            assertNotNull(ie[i].getLocation());
+        }
+
+        for (int i = 0; i < count; i++) {
+            refIndex.addReference(ie[i]);
+        }
+
+        restartDataIndexManager();
+
+        dataIndex = dataIndexManager.getStore(indexName);
+        refIndex = dataIndex.getStore(refName);
+
+        assertEquals(count, refIndex.size());
+
+        List<IndexEntry> loaded = refIndex.load(null, null, count * 2);
+        assertEquals(count, loaded.size());
+
+        // Verify that all the entries were loaded in the order inserted.
+        int i = 0;
+        for (IndexEntry entry : loaded) {
+            assertNotNull(entry);
+            assertNotNull(entry.getIndex());
+            assertEquals(ie[i], entry);
+            i++;
+        }
+
+        // Lets delete every other record.
+        int deleteCount = 0;
+        i = 0;
+        for (IndexEntry entry : loaded) {
+            if (i % 2 == 0) {
+                refIndex.remove(entry.getId());
+                deleteCount++;
+            }
+            i++;
+        }
+
+        // Restart and verify that the right records were removed.
+        restartDataIndexManager();
+        dataIndex = dataIndexManager.getStore(indexName);
+        refIndex = dataIndex.getStore(refName);
+
+        assertEquals(count - deleteCount, refIndex.size());
+
+        loaded = refIndex.load(null, null, count * 2);
+        assertEquals(count - deleteCount, loaded.size());
+
+        Iterator<IndexEntry> iterator = loaded.iterator();
+        for (int j = 0; j < count; j++) {
+            if (!(i % 2 == 0)) {
+                IndexEntry entry = iterator.next();
+                assertEquals(ie[j], entry);
+            }
+        }
+
+        // Verify that removing a store wipes out his data.
+        dataIndex.removeStore(refIndex);
+        refIndex = dataIndex.addStore(refName);
+        assertEquals(0, refIndex.size());
+
+    }
+
+    /**
+     * Verify that the create retrieve and delete operations work properly
+     * against a ReferenceIndex.
+     * 
+     * @throws Exception
+     */
+    public void testReferenceIndexCreateRemove() throws Exception {
+        String indexName = getName();
+        String refName = getName() + "-ref";
+        DataIndex dataIndex = dataIndexManager.addStore(indexName);
+        ReferenceIndex refIndex = dataIndex.addStore(refName);
+
+        int count = 10;
+        IndexEntry ie[] = new IndexEntry[count];
+        for (int i = 0; i < count; i++) {
+            ie[i] = dataIndex.addMessage(i, createLocation(0, i));
+            assertNotNull(ie[i]);
+            assertNotNull(ie[i].getIndex());
+            assertNotNull(ie[i].getId());
+            assertNotNull(ie[i].getLocation());
+        }
+
+        for (int i = 0; i < count; i++) {
+            refIndex.addReference(ie[i]);
+        }
+
+        restartDataIndexManager();
+
+        dataIndex = dataIndexManager.getStore(indexName);
+        refIndex = dataIndex.getStore(refName);
+
+        assertEquals(count, refIndex.size());
+
+        List<IndexEntry> loaded = refIndex.remove(null, null, count * 2);
+        assertEquals(count, loaded.size());
+
+        // Verify that all the entries were loaded in the order inserted.
+        int i = 0;
+        for (IndexEntry entry : loaded) {
+            assertNotNull(entry);
+            assertNotNull(entry.getIndex());
+            assertEquals(ie[i], entry);
+            i++;
+        }
+
+        // Restart and verify that all the records were removed.
+        restartDataIndexManager();
+        dataIndex = dataIndexManager.getStore(indexName);
+        refIndex = dataIndex.getStore(refName);
+
+        assertEquals(0, refIndex.size());
+
+    }
+
+}

Modified: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexTestSupport.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexTestSupport.java (original)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexTestSupport.java Wed Mar 12 20:16:19 2008
@@ -16,22 +16,18 @@
  */
 package org.apache.activemq.broker.router.index;
 
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 import junit.framework.TestCase;
 
 import org.apache.activemq.broker.router.index.api.DataIndex;
 import org.apache.activemq.broker.router.index.api.DataIndexManager;
 import org.apache.activemq.broker.router.index.api.IndexEntry;
-import org.apache.activemq.broker.router.index.api.ReferenceIndex;
 import org.apache.activemq.kaha.impl.async.Location;
 
 abstract public class IndexTestSupport extends TestCase {
 
-    DataIndexManager dataIndexManager;
+    protected DataIndexManager dataIndexManager;
 
     @Override
     protected void setUp() throws Exception {
@@ -59,194 +55,6 @@
         dataIndexManager = null;
     }
 
-    public void testCreateDestroyDataIndex() throws Exception {
-        String indexName = getName();
-
-        assertNull(dataIndexManager.getStore(indexName));
-        List<DataIndex> indexes = dataIndexManager.getStores();
-        assertTrue(indexes.isEmpty());
-
-        DataIndex dataIndex = dataIndexManager.addStore(indexName);
-        assertNotNull(dataIndex);
-
-        assertSame(dataIndex, dataIndexManager.getStore(indexName));
-        indexes = dataIndexManager.getStores();
-        assertEquals(1, indexes.size());
-        assertTrue(indexes.contains(dataIndex));
-
-        // Verify that the data index create was persisted between restart.
-        restartDataIndexManager();
-
-        dataIndex = dataIndexManager.getStore(indexName);
-        assertNotNull(dataIndex);
-        assertEquals(indexName, dataIndex.getName());
-        indexes = dataIndexManager.getStores();
-        assertEquals(1, indexes.size());
-        assertTrue(indexes.contains(dataIndex));
-
-        // Verify that the data index remove was persisted between restart.
-        dataIndexManager.removeStore(dataIndex);
-        restartDataIndexManager();
-
-        dataIndex = dataIndexManager.getStore(indexName);
-        assertNull(dataIndex);
-        indexes = dataIndexManager.getStores();
-        assertTrue(indexes.isEmpty());
-    }
-
-    public void testCreateDestroyReferenceIndex() throws Exception {
-
-        String dataName = getName();
-        String refName = getName() + "-ref";
-        DataIndex manager = dataIndexManager.addStore(dataName);
-
-        assertNull(manager.getStore(refName));
-        List<ReferenceIndex> indexes = manager.getStores();
-        assertTrue(indexes.isEmpty());
-
-        ReferenceIndex dataIndex = manager.addStore(refName);
-        assertNotNull(dataIndex);
-
-        assertSame(dataIndex, manager.getStore(refName));
-        indexes = manager.getStores();
-        assertEquals(1, indexes.size());
-        assertTrue(indexes.contains(dataIndex));
-
-        // Verify that the data index create was persisted between restart.
-        restartDataIndexManager();
-        manager = dataIndexManager.getStore(dataName);
-
-        dataIndex = manager.getStore(refName);
-        assertNotNull(dataIndex);
-        assertEquals(refName, dataIndex.getName());
-        indexes = manager.getStores();
-        assertEquals(1, indexes.size());
-        assertTrue(indexes.contains(dataIndex));
-
-        // Verify that the data index remove was persisted between restart.
-        manager.removeStore(dataIndex);
-        restartDataIndexManager();
-        manager = dataIndexManager.getStore(dataName);
-
-        dataIndex = manager.getStore(refName);
-        assertNull(dataIndex);
-        indexes = manager.getStores();
-        assertTrue(indexes.isEmpty());
-
-    }
-
-    /**
-     * Verify that index properties can be stored without any issues.
-     * 
-     * @throws Exception
-     */
-    public void testIndexProperties() throws Exception {
-
-        String indexName = getName();
-
-        // Make a relatively large property object..
-        Map<String, String> properties = new HashMap<String, String>();
-        for (int i = 0; i < 1000; i++) {
-            properties.put("key" + i, "value" + i);
-        }
-
-        DataIndex dataIndex = dataIndexManager.addStore(indexName);
-        dataIndex.setProperties(properties);
-        assertEquals(properties, dataIndex.getProperties());
-
-        // Restart and verify the the properties were preserved.
-        restartDataIndexManager();
-
-        dataIndex = dataIndexManager.getStore(indexName);
-        assertEquals(properties, dataIndex.getProperties());
-
-        dataIndex.setProperties(null);
-
-        String refName = indexName + "-ref";
-        ReferenceIndex refIndex = dataIndex.addStore(refName);
-        refIndex.setProperties(properties);
-
-        restartDataIndexManager();
-
-        dataIndex = dataIndexManager.getStore(indexName);
-        assertNull(dataIndex.getProperties());
-        refIndex = dataIndex.getStore(refName);
-
-        assertEquals(properties, refIndex.getProperties());
-
-    }
-
-    /**
-     * Verify that the create retrieve and delete operations work properly
-     * against a DataIndex.
-     * 
-     * @throws Exception
-     */
-    public void testDataIndexCRD() throws Exception {
-        String indexName = getName();
-        DataIndex dataIndex = dataIndexManager.addStore(indexName);
-
-        int count = 10;
-        IndexEntry ie[] = new IndexEntry[count];
-        for (int i = 0; i < count; i++) {
-            ie[i] = dataIndex.addMessage(new Long(i), createLocation(0, i));
-            assertNotNull(ie[i]);
-            assertNotNull(ie[i].getIndex());
-            assertNotNull(ie[i].getId());
-            assertNotNull(ie[i].getLocation());
-        }
-
-        restartDataIndexManager();
-        dataIndex = dataIndexManager.getStore(indexName);
-
-        assertEquals(count, dataIndex.size());
-
-        List<IndexEntry> loaded = dataIndex.load(null, null, count * 2);
-        assertEquals(count, loaded.size());
-
-        // Verify that all the entries were loaded in the order inserted.
-        int i = 0;
-        for (IndexEntry entry : loaded) {
-            assertNotNull(entry);
-            assertNotNull(entry.getIndex());
-            assertEquals(ie[i], entry);
-            i++;
-        }
-
-        // Lets delete every other record.
-        int deleteCount = 0;
-        i = 0;
-        for (IndexEntry entry : loaded) {
-            if (i % 2 == 0) {
-                dataIndex.remove(entry.getId());
-                deleteCount++;
-            }
-            i++;
-        }
-
-        // Restart and verify that the right records were removed.
-        restartDataIndexManager();
-        dataIndex = dataIndexManager.getStore(indexName);
-
-        assertEquals(count - deleteCount, dataIndex.size());
-
-        loaded = dataIndex.load(null, null, count * 2);
-        assertEquals(count - deleteCount, loaded.size());
-
-        Iterator<IndexEntry> iterator = loaded.iterator();
-        for (int j = 0; j < count; j++) {
-            if (!(i % 2 == 0)) {
-                IndexEntry entry = iterator.next();
-                assertEquals(ie[j], entry);
-            }
-        }
-
-        // Verify that removing a store wipes out his data.
-        dataIndexManager.removeStore(dataIndex);
-        dataIndex = dataIndexManager.addStore(indexName);
-        assertEquals(0, dataIndex.size());
-
-    }
 
     protected void assertEquals(IndexEntry expected, IndexEntry actual) {
         assertEquals(expected.getId(), actual.getId());
@@ -258,92 +66,11 @@
         assertEquals(expected.getDataFileId(), actual.getDataFileId());
     }
 
-    private Location createLocation(int fileId, int offset) {
+    protected Location createLocation(int fileId, int offset) {
         Location rc = new Location();
         rc.setDataFileId(fileId);
         rc.setOffset(offset);
         return rc;
-    }
-
-    /**
-     * Verify that the create retrieve and delete operations work properly
-     * against a ReferenceIndex.
-     * 
-     * @throws Exception
-     */
-    public void testReferenceIndexCRD() throws Exception {
-        String indexName = getName();
-        String refName = getName() + "-ref";
-        DataIndex dataIndex = dataIndexManager.addStore(indexName);
-        ReferenceIndex refIndex = dataIndex.addStore(refName);
-
-        int count = 10;
-        IndexEntry ie[] = new IndexEntry[count];
-        for (int i = 0; i < count; i++) {
-            ie[i] = dataIndex.addMessage(new Long(i), createLocation(0, i));
-            assertNotNull(ie[i]);
-            assertNotNull(ie[i].getIndex());
-            assertNotNull(ie[i].getId());
-            assertNotNull(ie[i].getLocation());
-        }
-
-        for (int i = 0; i < count; i++) {
-            refIndex.addReference(ie[i]);
-        }
-
-        restartDataIndexManager();
-
-        dataIndex = dataIndexManager.getStore(indexName);
-        refIndex = dataIndex.getStore(refName);
-
-        assertEquals(count, refIndex.size());
-
-        List<IndexEntry> loaded = refIndex.load(null, null, count * 2);
-        assertEquals(count, loaded.size());
-
-        // Verify that all the entries were loaded in the order inserted.
-        int i = 0;
-        for (IndexEntry entry : loaded) {
-            assertNotNull(entry);
-            assertNotNull(entry.getIndex());
-            assertEquals(ie[i], entry);
-            i++;
-        }
-
-        // Lets delete every other record.
-        int deleteCount = 0;
-        i = 0;
-        for (IndexEntry entry : loaded) {
-            if (i % 2 == 0) {
-                refIndex.remove(entry.getId());
-                deleteCount++;
-            }
-            i++;
-        }
-
-        // Restart and verify that the right records were removed.
-        restartDataIndexManager();
-        dataIndex = dataIndexManager.getStore(indexName);
-        refIndex = dataIndex.getStore(refName);
-
-        assertEquals(count - deleteCount, refIndex.size());
-
-        loaded = refIndex.load(null, null, count * 2);
-        assertEquals(count - deleteCount, loaded.size());
-
-        Iterator<IndexEntry> iterator = loaded.iterator();
-        for (int j = 0; j < count; j++) {
-            if (!(i % 2 == 0)) {
-                IndexEntry entry = iterator.next();
-                assertEquals(ie[j], entry);
-            }
-        }
-
-        // Verify that removing a store wipes out his data.
-        dataIndex.removeStore(refIndex);
-        refIndex = dataIndex.addStore(refName);
-        assertEquals(0, refIndex.size());
-
     }
 
 }



Mime
View raw message