activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r633639 [5/7] - in /activemq/sandbox/activemq-router: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/activemq/ src/main/java/org/apache/activemq/broker/ src/main/java/org/apache/active...
Date Tue, 04 Mar 2008 21:01:57 GMT
Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/package.html?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/package.html (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/package.html Tue Mar  4 13:01:41 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Provides the Journal interface which is a simple facade to an AsyncDataManager.  It handles encoding and decoding all the journal entries stored in the AsyncDataManager.
+
+</body>
+</html>

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/package.html?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/package.html (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/package.html Tue Mar  4 13:01:41 2008
@@ -0,0 +1,28 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+A persistent implementation of the org.apache.activemq.broker.router.store.api package.  Operations are appended to a journal log file which allows
+for very fast concurrent operations.  It makes use of an implementation of the org.apache.activemq.broker.router.index.api to actually track what messages are in which
+destination.  The interface to org.apache.activemq.broker.router.index.api is designed so that index and asynchronously update it's data files and if it looses it some 
+of the last updates due to a failure, the journal will re-do those lost operations so that it stays consistent. 
+
+</body>
+</html>

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStore.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStore.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStore.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.memory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.broker.router.api.Destination;
+import org.apache.activemq.broker.router.store.api.CacheEntry;
+import org.apache.activemq.broker.router.store.api.DataStore;
+import org.apache.activemq.broker.router.store.api.ReferenceStore;
+import org.apache.activemq.broker.router.util.LinkedNode;
+import org.apache.activemq.broker.router.util.LinkedNodeList;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.usage.SystemUsage;
+
+/**
+ * 
+ * @author chirino
+ */
+public class MemoryDataStore extends MemoryStore implements DataStore {
+    // private final static Log LOG = LogFactory.getLog(MemoryDataStore.class);
+
+    private final MemoryStoreManager<ReferenceStore> stores = new MemoryStoreManager<ReferenceStore>() {
+        @Override
+        protected ReferenceStore createStore(String name) {
+            return new MemoryReferenceStore(name, MemoryDataStore.this);
+        }
+
+        @Override
+        protected void destroyStore(ReferenceStore store) {
+        }
+    };
+
+    public class MemoryCacheEntry extends LinkedNode<MemoryCacheEntry> implements CacheEntry {
+
+        // Tracks load requests by the broker.. keeps the message in memory
+        private int loadCounter;
+        // Tracks how many reference containers are using this entry.. make it
+        // easier to delete unused entries.
+        private int referenceCounter;
+
+        public final long id;
+        public Message message;
+        public boolean locked;
+        public long sequenceId;
+
+        public MemoryCacheEntry(long id, Message data) {
+            this.id = id;
+            this.message = data;
+        }
+
+        public Message getMessage() {
+            return message;
+        }
+
+        public long getId() {
+            return id;
+        }
+
+        public void incrementRedeliveryCounter() {
+        }
+
+        public DataStore getStore() {
+            return MemoryDataStore.this;
+        }
+
+        synchronized public void load() {
+            loadCounter++;
+        }
+
+        synchronized public boolean loadUnlessDropped() {
+            if (message != null) {
+                loadCounter++;
+                return true;
+            }
+            return false;
+        }
+
+        synchronized public void unload() {
+            if (!(loadCounter >= 0)) {
+                System.out.println("unload() should match a corresponding load() call. ");
+            }
+
+            // assert loadCounter >= 0: "unload() should match a corresponding
+            // load() call. ";
+            loadCounter--;
+            if (autoRemove && message != null && loadCounter < 1 && referenceCounter < 1) {
+                systemUsage.getMemoryUsage().decreaseUsage(message.getSize());
+                message = null;
+            }
+        }
+
+        synchronized public void reference() {
+            referenceCounter++;
+        }
+
+        synchronized public void unreference() {
+            assert referenceCounter > 0 : "unreference() should only be called to undo a previous reference()";
+            referenceCounter--;
+            if (autoRemove && message != null && loadCounter < 1 && referenceCounter < 1) {
+                systemUsage.getMemoryUsage().decreaseUsage(message.getSize());
+                message = null;
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "id: " + id + ", linked: " + isLinked() + ", loadCounter: " + loadCounter + ", referenceCounter: " + referenceCounter;
+        }
+
+        synchronized public boolean lock() {
+            if (locked)
+                return false;
+            locked = true;
+            return true;
+        }
+
+        synchronized public void unlock() {
+            locked = false;
+        }
+
+        synchronized public void drop() {
+            if (message != null) {
+                systemUsage.getMemoryUsage().decreaseUsage(message.getSize());
+                message = null;
+            }
+        }
+    }
+
+    private final Map<Long, MemoryCacheEntry> cache = new ConcurrentHashMap<Long, MemoryCacheEntry>();
+    private final LinkedNodeList<MemoryCacheEntry> records = new LinkedNodeList<MemoryCacheEntry>();
+    private final AtomicReference<MemoryCacheEntry> lastRecordId = new AtomicReference<MemoryCacheEntry>();
+    private long lastSequenceId;;
+
+    private SystemUsage systemUsage;
+    private Destination destination;
+    private boolean autoRemove;
+
+    public MemoryDataStore(String name) {
+        super(name);
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // StoreManager related methods
+    // /////////////////////////////////////////////////////////////////
+    public ReferenceStore addStore(String name) throws Exception {
+        return stores.addStore(name);
+    }
+
+    public ReferenceStore getStore(String name) throws Exception {
+        return stores.getStore(name);
+    }
+
+    public List<ReferenceStore> getStores() throws Exception {
+        return stores.getStores();
+    }
+
+    public void removeStore(ReferenceStore store) throws Exception {
+        stores.removeStore(store);
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // Storage methods
+    // /////////////////////////////////////////////////////////////////
+    public MemoryCacheEntry addMessage(long id, Message record, Runnable onCompleted) throws Exception {
+
+        MemoryCacheEntry rc = new MemoryCacheEntry(id, record);
+        if (!autoRemove) {
+            synchronized (records) {
+                rc.sequenceId = ++lastSequenceId;
+                records.addLast(rc);
+            }
+            cache.put(rc.getId(), rc);
+        }
+        systemUsage.getMemoryUsage().increaseUsage(record.getSize());
+        rc.load();
+        if (onCompleted != null) {
+            onCompleted.run();
+        }
+        return rc;
+    }
+
+    public AddMessageTransactionAction addTransactedMessage(final long id, final Message message, final TransactionId tx, Runnable onCompleted) throws Exception {
+        if (onCompleted != null) {
+            onCompleted.run();
+        }
+        return new AddMessageTransactionAction() {
+
+            public Message getMessage() {
+                return message;
+            }
+
+            public DataStore getDataStore() {
+                return MemoryDataStore.this;
+            }
+
+            public TransactionId getTransactionId() {
+                return tx;
+            }
+
+            public CacheEntry complete() throws Exception {
+                return addMessage(id, message, null);
+            }
+
+        };
+    }
+
+    public List<CacheEntry> load(CacheEntry firstCE, CacheEntry lastCE, int max) throws Exception {
+
+        ArrayList<CacheEntry> rc = new ArrayList<CacheEntry>(max);
+        if (rc.size() == max) {
+            return rc;
+        }
+
+        MemoryCacheEntry first = (MemoryCacheEntry) firstCE;
+        MemoryCacheEntry last = (MemoryCacheEntry) lastCE;
+
+        boolean inRange = first == null ? true : false;
+        MemoryCacheEntry record = (MemoryCacheEntry) first;
+
+        synchronized (records) {
+            if (records.isEmpty()) {
+                return rc;
+            }
+            if (record == null) {
+                record = records.getHead();
+            }
+
+            // record might be a stale cache entry that has already been
+            // unlinked. But we can still
+            // use it cause it has valid pointers to the next record location.
+            MemoryCacheEntry lastId = record;
+            // MemoryCacheEntry firstId = record;
+            while (true) {
+                // Don't load more than the max messages.
+                if (inRange) {
+                    if (rc.size() == max) {
+                        return rc;
+                    }
+                    if (record.isLinked() && (last == null || record.sequenceId <= last.sequenceId)) {
+                        rc.add(record);
+                    } else {
+                        // Don't load past the last message.
+                        break;
+                    }
+                } else {
+                    // Don't load before the first message.
+                    if (record.sequenceId > first.sequenceId) {
+                        inRange = true;
+                        if (record.isLinked()) {
+                            rc.add(record);
+                        }
+                    }
+                }
+                // Dont go past that last record.
+                if (record == records.getTail()) {
+                    break;
+                }
+                record = record.getNextCircular();
+
+                // Lets make sure we are not looping around.
+                assert record.sequenceId > lastId.sequenceId : "Load seems to be looping around the linked list.";
+                lastId = record;
+            }
+            ;
+        }
+        return rc;
+    }
+
+    public void remove(long id, Runnable onCompleted) throws Exception {
+        MemoryCacheEntry record = cache.remove(id);
+        if (record != null) {
+            record.drop();
+            synchronized (records) {
+                record.unlink();
+            }
+        }
+        if (onCompleted != null) {
+            onCompleted.run();
+        }
+    }
+
+    public RemoveMesageTransactionAction removeTransacted(final long id, final TransactionId tx, Runnable onCompleted) throws Exception {
+        if (onCompleted != null) {
+            onCompleted.run();
+        }
+        return new RemoveMesageTransactionAction() {
+            public long getId() {
+                return id;
+            }
+
+            public DataStore getDataStore() {
+                return MemoryDataStore.this;
+            }
+
+            public TransactionId getTransactionId() {
+                return tx;
+            }
+
+            public void complete() throws Exception {
+                remove(id, null);
+            }
+        };
+    }
+
+    public void removeUnreferencedRecords(CacheEntry maxId) throws Exception {
+        // We remove unreferenced records on the fly using reference counting..
+    }
+
+    public long size() throws Exception {
+        synchronized (records) {
+            return records.size();
+        }
+    }
+
+    public MemoryCacheEntry getLastAddedEntry() throws Exception {
+        return lastRecordId.get();
+    }
+
+    public MemoryCacheEntry load(long id) {
+        MemoryCacheEntry rc = cache.get(id);
+        if (rc != null) {
+            rc.load();
+        }
+        return rc;
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(Destination destination) {
+        this.destination = destination;
+        this.systemUsage = destination.getSystemUsage();
+    }
+
+    public void setAutoRemove(boolean autoRemove) {
+        this.autoRemove = autoRemove;
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStoreManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStoreManager.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStoreManager.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStoreManager.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.memory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.broker.router.store.api.DataStore;
+import org.apache.activemq.broker.router.store.api.DataStoreManager;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+
+/**
+ * 
+ * @author chirino
+ */
+public class MemoryDataStoreManager implements DataStoreManager {
+
+    private final MemoryStoreManager<DataStore> stores = new MemoryStoreManager<DataStore>() {
+        @Override
+        protected DataStore createStore(String name) {
+            return new MemoryDataStore(name);
+        }
+
+        @Override
+        protected void destroyStore(DataStore store) {
+        }
+    };
+
+    public void start() throws Exception {
+    }
+
+    public void stop() throws Exception {
+    }
+
+    public DataStore addStore(String name) throws Exception {
+        return stores.addStore(name);
+    }
+
+    public DataStore getStore(String name) throws Exception {
+        return stores.getStore(name);
+    }
+
+    public List<DataStore> getStores() throws Exception {
+        return stores.getStores();
+    }
+
+    public void removeStore(DataStore store) throws Exception {
+        stores.removeStore(store);
+    }
+
+    public void record(TransactionInfo txOperation, Runnable onComplete) throws Exception {
+    }
+
+    public Map<TransactionId, List<TransactionAction>> recoverPendingTransactions() throws Exception {
+        return new HashMap<TransactionId, List<TransactionAction>>();
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryReferenceStore.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryReferenceStore.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryReferenceStore.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.memory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.activemq.broker.router.store.api.CacheEntry;
+import org.apache.activemq.broker.router.store.api.DataStore;
+import org.apache.activemq.broker.router.store.api.ReferenceStore;
+import org.apache.activemq.broker.router.store.memory.MemoryDataStore.MemoryCacheEntry;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.TransactionId;
+
+/**
+ * 
+ * @author chirino
+ */
+public class MemoryReferenceStore extends MemoryStore implements ReferenceStore {
+
+    private final MemoryDataStore dataStore;
+    private final LinkedList<ReferenceCacheEntry> references = new LinkedList<ReferenceCacheEntry>();
+    private ReferenceCacheEntry lastRecordId;
+    private long lastSequenceId;
+
+    static class ReferenceCacheEntry implements CacheEntry {
+        final MemoryCacheEntry cacheEntry;
+        final long sequenceId;
+
+        public ReferenceCacheEntry(MemoryCacheEntry cacheEntry, long sequenceId) {
+            this.cacheEntry = cacheEntry;
+            this.sequenceId = sequenceId;
+        }
+
+        public long getId() {
+            return cacheEntry.getId();
+        }
+
+        public Message getMessage() {
+            return cacheEntry.getMessage();
+        }
+
+        public DataStore getStore() {
+            return cacheEntry.getStore();
+        }
+
+        public void incrementRedeliveryCounter() {
+            cacheEntry.incrementRedeliveryCounter();
+        }
+
+        public void load() {
+            cacheEntry.load();
+        }
+
+        public boolean lock() {
+            return cacheEntry.lock();
+        }
+
+        public void unload() {
+            cacheEntry.unload();
+        }
+
+        public void unlock() {
+            cacheEntry.unlock();
+        }
+    }
+
+    public MemoryReferenceStore(String name, MemoryDataStore dataStore) {
+        super(name);
+        this.dataStore = dataStore;
+    }
+
+    public void addReference(CacheEntry cacheEntry) throws Exception {
+        MemoryCacheEntry mce = (MemoryCacheEntry) cacheEntry;
+        mce.reference();
+        synchronized (this) {
+            lastRecordId = new ReferenceCacheEntry(mce, ++lastSequenceId);
+            references.add(lastRecordId);
+        }
+    }
+
+    public List<CacheEntry> load(CacheEntry firstCE, CacheEntry lastCE, int max) throws Exception {
+
+        ReferenceCacheEntry first = (ReferenceCacheEntry) firstCE;
+        ReferenceCacheEntry last = (ReferenceCacheEntry) lastCE;
+
+        ArrayList<CacheEntry> rc = new ArrayList<CacheEntry>(max);
+        synchronized (this) {
+
+            boolean inRange = first == null ? true : false;
+
+            for (Iterator<ReferenceCacheEntry> iterator = references.iterator(); iterator.hasNext();) {
+                ReferenceCacheEntry record = iterator.next();
+
+                // Don't load more than the max messages.
+                if (rc.size() == max) {
+                    break;
+                }
+                if (inRange) {
+                    if (last == null || record.sequenceId <= last.sequenceId) {
+                        if (record.cacheEntry.loadUnlessDropped()) {
+                            rc.add(record);
+                        } else {
+                            iterator.remove();
+                        }
+                    } else {
+                        // Don't load past the last message.
+                        break;
+                    }
+                } else {
+                    // Don't load before the first message..
+                    if (first == null || record.sequenceId > first.sequenceId) {
+                        inRange = true;
+                        if (record.cacheEntry.loadUnlessDropped()) {
+                            rc.add(record);
+                        } else {
+                            iterator.remove();
+                        }
+                    }
+                }
+            }
+
+        }
+
+        return rc;
+    }
+
+    public List<CacheEntry> remove(CacheEntry firstCE, CacheEntry lastCE, int max) {
+        ReferenceCacheEntry first = (ReferenceCacheEntry) firstCE;
+        ReferenceCacheEntry last = (ReferenceCacheEntry) lastCE;
+
+        ArrayList<CacheEntry> rc = new ArrayList<CacheEntry>(max);
+        if (rc.size() == max) {
+            return rc;
+        }
+
+        synchronized (this) {
+
+            boolean inRange = first == null ? true : false;
+            for (Iterator<ReferenceCacheEntry> iterator = references.iterator(); iterator.hasNext();) {
+                ReferenceCacheEntry record = iterator.next();
+
+                if (inRange) {
+                    // Don't load more than the max messages.
+                    if (rc.size() == max) {
+                        break;
+                    }
+                    if (last == null || record.sequenceId < last.sequenceId) {
+                        if (record.cacheEntry.loadUnlessDropped()) {
+                            record.cacheEntry.unreference();
+                            rc.add(record);
+                        }
+                        iterator.remove();
+                    } else {
+                        // Don't load past the last message.
+                        break;
+                    }
+                } else {
+                    // Don't load before the first message..
+                    if (record.sequenceId > first.sequenceId) {
+                        inRange = true;
+                        if (record.cacheEntry.loadUnlessDropped()) {
+                            record.cacheEntry.unreference();
+                            rc.add(record);
+                        }
+                        iterator.remove();
+                    }
+                }
+            }
+        }
+        return rc;
+    }
+
+    public long size() throws Exception {
+        return references.size();
+    }
+
+    public CacheEntry getLastAddedEntry() throws Exception {
+        return lastRecordId;
+    }
+
+    public void remove(long id, Runnable onCompleted) throws Exception {
+        synchronized (this) {
+            for (Iterator<ReferenceCacheEntry> iterator = references.iterator(); iterator.hasNext();) {
+                ReferenceCacheEntry ce = iterator.next();
+                if (ce.getId() == id) {
+                    ce.cacheEntry.unreference();
+                    iterator.remove();
+                    break;
+                }
+            }
+        }
+        if (onCompleted != null) {
+            onCompleted.run();
+        }
+    }
+
+    public RemoveReferenceTransactionAction removeReferenceTransacted(final long id, final TransactionId tx, Runnable onCompleted) throws Exception {
+        if (onCompleted != null) {
+            onCompleted.run();
+        }
+        return new RemoveReferenceTransactionAction() {
+
+            public long getId() {
+                return id;
+            }
+
+            public ReferenceStore getReferenceStore() {
+                return MemoryReferenceStore.this;
+            }
+
+            public DataStore getDataStore() {
+                return dataStore;
+            }
+
+            public TransactionId getTransactionId() {
+                return tx;
+            }
+
+            public void complete() throws Exception {
+                remove(id, null);
+            }
+        };
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStore.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStore.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStore.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.memory;
+
+import java.util.Map;
+
+import org.apache.activemq.broker.router.store.api.Store;
+
+/**
+ * 
+ * @author chirino
+ */
+abstract public class MemoryStore implements Store {
+
+    private String name;
+    private Map<String, String> properties;
+
+    public MemoryStore(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setProperties(Map<String, String> properties) throws Exception {
+        this.properties = properties;
+    }
+
+    public Map<String, String> getProperties() throws Exception {
+        return properties;
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStoreManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStoreManager.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStoreManager.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStoreManager.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.memory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.activemq.broker.router.store.api.Store;
+import org.apache.activemq.broker.router.store.api.StoreManager;
+
+/**
+ * 
+ * @author chirino
+ */
+abstract public class MemoryStoreManager<T extends Store> implements StoreManager<T> {
+
+    private HashMap<String, T> stores = new HashMap<String, T>();
+
+    synchronized public T addStore(String name) throws Exception {
+        T rc = stores.get(name);
+        if (rc == null) {
+            rc = createStore(name);
+            stores.put(name, rc);
+        }
+        return rc;
+    }
+
+    abstract protected T createStore(String name);
+
+    abstract protected void destroyStore(T store);
+
+    synchronized public T getStore(String name) throws Exception {
+        return stores.get(name);
+    }
+
+    synchronized public List<T> getStores() throws Exception {
+        ArrayList<T> rc = new ArrayList<T>();
+        rc.addAll(stores.values());
+        return rc;
+    }
+
+    synchronized public void removeStore(T store) throws Exception {
+        stores.remove(store.getName());
+        destroyStore(store);
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/package.html?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/package.html (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/package.html Tue Mar  4 13:01:41 2008
@@ -0,0 +1,28 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+A non-persistent implementation of the org.apache.activemq.broker.router.store.api package.  
+All messages put in this store are held in ram and count against the Destinations's MemoryUsage.
+
+TODO: subclass this implementation so that we can spool old messages off to disk.
+
+</body>
+</html>

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/DuplicateAndMissedChecker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/DuplicateAndMissedChecker.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/DuplicateAndMissedChecker.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/DuplicateAndMissedChecker.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.command.MessageId;
+
+/**
+ * Utility class used to check for duplicate and miss messages. Keeps track of a
+ * SequenceSet per producer.
+ * 
+ * @author chirino
+ */
+public class DuplicateAndMissedChecker {
+
+    final HashMap<String, SequenceSet> sequences = new HashMap<String, SequenceSet>();
+    final ArrayList<OrderChecker> orderCheckers = new ArrayList<OrderChecker>();
+
+    public void onMessageId(MessageId messageId) {
+        String pid = messageId.getProducerId().toString();
+        long id = messageId.getProducerSequenceId();
+        onMessage(pid, id);
+    }
+
+    /**
+     * 
+     * @param pid
+     * @param id
+     * @throws IllegalArgumentException
+     *             is the message id is a duplicate
+     */
+    public void onMessage(String pid, long id) throws IllegalArgumentException {
+        SequenceSet sequenceSet = getSequenceSet(pid);
+        if (!sequenceSet.add(id)) {
+            throw new IllegalArgumentException("Message from producer: " + pid + " was a duplicate: " + id);
+        }
+    }
+
+    private SequenceSet getSequenceSet(String pid) {
+        SequenceSet sequenceSet;
+        synchronized (sequences) {
+            sequenceSet = sequences.get(pid);
+            if (sequenceSet == null) {
+                sequenceSet = new SequenceSet();
+                sequences.put(pid, sequenceSet);
+            }
+        }
+        return sequenceSet;
+    }
+
+    public void addOrderChecker(OrderChecker checker) {
+        synchronized (orderCheckers) {
+            orderCheckers.add(checker);
+        }
+    }
+
+    public void removeOrderChecker(OrderChecker checker) {
+        synchronized (orderCheckers) {
+            orderCheckers.remove(checker);
+        }
+    }
+
+    public Map<String, List<Sequence>> findMissingMessages() {
+
+        Map<String, List<Sequence>> missing = new HashMap<String, List<Sequence>>();
+
+        HashMap<String, SequenceSet> sequences;
+        synchronized (this.sequences) {
+            sequences = new HashMap<String, SequenceSet>(this.sequences);
+        }
+
+        for (Map.Entry<String, SequenceSet> entry : sequences.entrySet()) {
+            Long firstId = null;
+            synchronized (orderCheckers) {
+                for (OrderChecker checker : orderCheckers) {
+                    Long id = checker.getLastIdForProducer(entry.getKey());
+                    if (id != null) {
+                        if (firstId == null) {
+                            firstId = id;
+                        } else {
+                            if (id.compareTo(firstId) <= 0) {
+                                firstId = id;
+                            }
+                        }
+                    } else {
+                        // We can't reliably know the minum last value that we
+                        // should have received
+                        // until all the consumer report at least 1 received
+                        // message.
+                        firstId = null;
+                        break;
+                    }
+                }
+            }
+            if (firstId != null) {
+                missing.put(entry.getKey(), entry.getValue().getMissing(1, firstId));
+            }
+        }
+
+        return missing;
+    }
+
+    public Map<String, List<Sequence>> getMissingMessages(long first, long last) {
+
+        HashMap<String, SequenceSet> sequences;
+        synchronized (this.sequences) {
+            sequences = new HashMap<String, SequenceSet>(this.sequences);
+        }
+
+        Map<String, List<Sequence>> missing = new HashMap<String, List<Sequence>>();
+        for (Map.Entry<String, SequenceSet> entry : sequences.entrySet()) {
+            missing.put(entry.getKey(), entry.getValue().getMissing(first, last));
+        }
+        return missing;
+    }
+
+    public Map<String, List<Sequence>> getReceivedMessages() {
+        Map<String, List<Sequence>> received = new HashMap<String, List<Sequence>>();
+
+        HashMap<String, SequenceSet> sequences;
+        synchronized (this.sequences) {
+            sequences = new HashMap<String, SequenceSet>(this.sequences);
+        }
+        for (Map.Entry<String, SequenceSet> entry : sequences.entrySet()) {
+            received.put(entry.getKey(), entry.getValue().getReceived());
+        }
+        return received;
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNode.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNode.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNode.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNode.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.util;
+
+/**
+ * Provides a base class for you to extend when you want object to maintain a
+ * doubly linked list to other objects without using a collection class.
+ * 
+ * @author chirino
+ */
+public class LinkedNode<T extends LinkedNode<T>> {
+
+    protected LinkedNodeList<T> list;
+    protected T next;
+    protected T prev;
+
+    public LinkedNode() {
+    }
+
+    @SuppressWarnings("unchecked")
+    private T getThis() {
+        return (T) this;
+    }
+
+    public T getHeadNode() {
+        return list.head;
+    }
+
+    public T getTailNode() {
+        return list.head.prev;
+    }
+
+    public T getNext() {
+        return isTailNode() ? null : next;
+    }
+
+    public T getPrevious() {
+        return isHeadNode() ? null : prev;
+    }
+
+    public T getNextCircular() {
+        return next;
+    }
+
+    public T getPreviousCircular() {
+        return prev;
+    }
+
+    public boolean isHeadNode() {
+        return list.head == this;
+    }
+
+    public boolean isTailNode() {
+        return list.head.prev == this;
+    }
+
+    /**
+     * @param node
+     *            the node to link after this node.
+     * @return this
+     */
+    public void linkAfter(T node) {
+        if (node == this) {
+            throw new IllegalArgumentException("You cannot link to yourself");
+        }
+        if (node.list != null) {
+            throw new IllegalArgumentException("You only insert nodes that are not in a list");
+        }
+        if (list == null) {
+            throw new IllegalArgumentException("This node is not yet in a list");
+        }
+
+        node.list = list;
+
+        // given we linked this<->next and are inserting node in between
+        node.prev = getThis(); // link this<-node
+        node.next = next; // link node->next
+        next.prev = node; // link node<-next
+        next = node; // this->node
+        list.size++;
+    }
+
+    /**
+     * @param rightList
+     *            the node to link after this node.
+     * @return this
+     */
+    public void linkAfter(LinkedNodeList<T> rightList) {
+
+        if (rightList == list) {
+            throw new IllegalArgumentException("You cannot link to yourself");
+        }
+        if (list == null) {
+            throw new IllegalArgumentException("This node is not yet in a list");
+        }
+
+        T rightHead = rightList.head;
+        T rightTail = rightList.head.prev;
+        list.reparent(rightList);
+
+        // given we linked this<->next and are inserting list in between
+        rightHead.prev = getThis(); // link this<-list
+        rightTail.next = next; // link list->next
+        next.prev = rightTail; // link list<-next
+        next = rightHead; // this->list
+        list.size++;
+    }
+
+    /**
+     * @param node
+     *            the node to link after this node.
+     * @return
+     * @return this
+     */
+    public void linkBefore(T node) {
+
+        if (node == this) {
+            throw new IllegalArgumentException("You cannot link to yourself");
+        }
+        if (node.list != null) {
+            throw new IllegalArgumentException("You only insert nodes that are not in a list");
+        }
+        if (list == null) {
+            throw new IllegalArgumentException("This node is not yet in a list");
+        }
+
+        node.list = list;
+
+        // given we linked prev<->this and are inserting node in between
+        node.next = getThis(); // node->this
+        node.prev = prev; // prev<-node
+        prev.next = node; // prev->node
+        prev = node; // node<-this
+
+        if (this == list.head) {
+            list.head = node;
+        }
+        list.size++;
+    }
+
+    /**
+     * @param leftList
+     *            the node to link after this node.
+     * @return
+     * @return this
+     */
+    public void linkBefore(LinkedNodeList<T> leftList) {
+
+        if (leftList == list) {
+            throw new IllegalArgumentException("You cannot link to yourself");
+        }
+        if (list == null) {
+            throw new IllegalArgumentException("This node is not yet in a list");
+        }
+
+        T leftHead = leftList.head;
+        T leftTail = leftList.head.prev;
+        list.reparent(leftList);
+
+        // given we linked prev<->this and are inserting list in between
+        leftTail.next = getThis(); // list->this
+        leftHead.prev = prev; // prev<-list
+        prev.next = leftHead; // prev->list
+        prev = leftTail; // list<-this
+
+        if (isHeadNode()) {
+            list.head = leftHead;
+        }
+    }
+
+    public void linkToTail(LinkedNodeList<T> target) {
+        if (list != null) {
+            throw new IllegalArgumentException("This node is already linked to a node");
+        }
+
+        if (target.head == null) {
+            next = prev = target.head = getThis();
+            list = target;
+            list.size++;
+        } else {
+            target.head.prev.linkAfter(getThis());
+        }
+    }
+
+    public void linkToHead(LinkedNodeList<T> target) {
+        if (list != null) {
+            throw new IllegalArgumentException("This node is already linked to a node");
+        }
+
+        if (target.head == null) {
+            next = prev = target.head = getThis();
+            list = target;
+            list.size++;
+        } else {
+            target.head.linkBefore(getThis());
+        }
+    }
+
+    /**
+     * Removes this node out of the linked list it is chained in.
+     */
+    public void unlink() {
+
+        // If we are allready unlinked...
+        if (list == null) {
+            return;
+        }
+
+        if (getThis() == prev) {
+            // We are the only item in the list
+            list.head = null;
+        } else {
+            // given we linked prev<->this<->next
+            next.prev = prev; // prev<-next
+            prev.next = next; // prev->next
+
+            if (isHeadNode()) {
+                list.head = next;
+            }
+        }
+        list.size--;
+        list = null;
+    }
+
+    /**
+     * Splits the list into 2 lists. This node becomes the tail of this list.
+     * Then 2nd list is returned.
+     * 
+     * @return An empty list if this is a tail node.
+     */
+    public LinkedNodeList<T> splitAfter() {
+
+        if (isTailNode()) {
+            return new LinkedNodeList<T>();
+        }
+
+        // Create the new list
+        LinkedNodeList<T> newList = new LinkedNodeList<T>();
+        newList.head = next;
+
+        // Update the head and tail of the new list so that they point to each
+        // other.
+        newList.head.prev = list.head.prev; // new list: tail<-head
+        newList.head.prev.next = newList.head; // new list: tail->head
+        next = list.head; // old list: tail->head
+        list.head.prev = getThis(); // old list: tail<-head
+
+        // Update all the nodes in the new list so that they know of their new
+        // list owner.
+        T n = newList.head;
+        newList.size++;
+        list.size--;
+        do {
+            n.list = newList;
+            n = n.next;
+            newList.size++;
+            list.size--;
+        } while (n != newList.head);
+
+        return newList;
+    }
+
+    /**
+     * Splits the list into 2 lists. This node becomes the head of this list.
+     * Then 2nd list is returned.
+     * 
+     * @return An empty list if this is a head node.
+     */
+    public LinkedNodeList<T> splitBefore() {
+
+        if (isHeadNode()) {
+            return new LinkedNodeList<T>();
+        }
+
+        // Create the new list
+        LinkedNodeList<T> newList = new LinkedNodeList<T>();
+        newList.head = list.head;
+        list.head = getThis();
+
+        T newListTail = prev;
+
+        prev = newList.head.prev; // old list: tail<-head
+        prev.next = getThis(); // old list: tail->head
+        newList.head.prev = newListTail; // new list: tail<-head
+        newListTail.next = newList.head; // new list: tail->head
+
+        // Update all the nodes in the new list so that they know of their new
+        // list owner.
+        T n = newList.head;
+        newList.size++;
+        list.size--;
+        do {
+            n.list = newList;
+            n = n.next;
+            newList.size++;
+            list.size--;
+        } while (n != newList.head);
+
+        return newList;
+    }
+
+    public boolean isLinked() {
+        return list != null;
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNodeList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNodeList.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNodeList.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNodeList.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.util;
+
+/**
+ * Provides a list of LinkedNode objects. 
+ * 
+ * @author chirino
+ */
+public class LinkedNodeList<T extends LinkedNode<T>> {
+
+    T head;
+    int size;
+
+    public LinkedNodeList() {
+    }
+
+    public boolean isEmpty() {
+        return head == null;
+    }
+
+    public void addLast(T node) {
+        node.linkToTail(this);
+    }
+
+    public void addFirst(T node) {
+        node.linkToHead(this);
+    }
+
+    public T getHead() {
+        return head;
+    }
+
+    public T getTail() {
+        return head.prev;
+    }
+
+    public void addLast(LinkedNodeList<T> list) {
+        if (list.isEmpty()) {
+            return;
+        }
+        if (head == null) {
+            reparent(list);
+            head = list.head;
+            list.head = null;
+        } else {
+            getTail().linkAfter(list);
+        }
+    }
+
+    public void addFirst(LinkedNodeList<T> list) {
+        if (list.isEmpty()) {
+            return;
+        }
+        if (head == null) {
+            reparent(list);
+            head = list.head;
+            list.head = null;
+        } else {
+            getHead().linkBefore(list);
+        }
+    }
+
+    public T reparent(LinkedNodeList<T> list) {
+        size += list.size;
+        T n = list.head;
+        do {
+            n.list = this;
+            n = n.next;
+        } while (n != list.head);
+        list.head = null;
+        list.size = 0;
+        return n;
+    }
+
+    /**
+     * Move the head to the tail and returns the new head node.
+     * 
+     * @return
+     */
+    public T rotate() {
+        return head = head.getNext();
+    }
+
+    public int size() {
+        return size;
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/OrderChecker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/OrderChecker.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/OrderChecker.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/OrderChecker.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.util;
+
+import java.util.HashMap;
+
+import org.apache.activemq.command.MessageId;
+
+/**
+ * 
+ * @author chirino
+ */
+public class OrderChecker {
+    final DuplicateAndMissedChecker next;
+    final HashMap<String, Long> lastMessageIds = new HashMap<String, Long>();
+
+    public OrderChecker() {
+        this.next = null;
+    }
+
+    public OrderChecker(DuplicateAndMissedChecker next) {
+        this.next = next;
+        this.next.addOrderChecker(this);
+    }
+
+    public void onMessageId(MessageId messageId) {
+        String pid = messageId.getProducerId().toString();
+        long id = messageId.getProducerSequenceId();
+        onMessage(pid, id);
+    }
+
+    public void onMessage(String pid, long id) throws IllegalArgumentException {
+        synchronized (lastMessageIds) {
+            Long last = lastMessageIds.get(pid);
+            if (last != null) {
+                if (id <= last) {
+                    throw new IllegalArgumentException("Got out of order message from producer: " + pid + " got " + id + ", expected a message larger than " + last);
+                }
+            }
+            lastMessageIds.put(pid, id);
+        }
+        if (next != null) {
+            next.onMessage(pid, id);
+        }
+    }
+
+    Long getLastIdForProducer(String pid) {
+        synchronized (lastMessageIds) {
+            return lastMessageIds.get(pid);
+        }
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selectable.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selectable.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selectable.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selectable.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.util;
+
+/**
+ * 
+ * @author chirino
+ */
+public interface Selectable<T> {
+
+    public T getValue();
+
+    public void setEnabled(boolean enabled);
+
+    public void setProcessed(boolean enabled);
+
+    public void close();
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selector.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selector.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selector.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selector.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 
+ * @author chirino
+ */
+public class Selector<T> {
+
+    final Object mutex = new Object() {
+    };
+
+    final LinkedNodeList<SelectableImpl> enabled = new LinkedNodeList<SelectableImpl>();
+    final LinkedNodeList<SelectableImpl> disabled = new LinkedNodeList<SelectableImpl>();
+    Runnable runOnWakeup;
+
+    private class SelectableImpl extends LinkedNode<SelectableImpl> implements Selectable<T> {
+
+        private T value;
+        private boolean on;
+        private boolean processed = true;
+
+        public SelectableImpl(T value) {
+            this.value = value;
+        }
+
+        public T getValue() {
+            return value;
+        }
+
+        public void setEnabled(boolean on) {
+            synchronized (mutex) {
+                if (this.on == on) {
+                    return;
+                }
+                this.on = on;
+                unlink();
+                linkSelector();
+            }
+        }
+
+        public void setProcessed(boolean processed) {
+            synchronized (mutex) {
+                if (this.processed == processed) {
+                    return;
+                }
+                this.processed = processed;
+                unlink();
+                linkSelector();
+            }
+        }
+
+        public void close() {
+            synchronized (mutex) {
+                unlink();
+            }
+        }
+
+        private void linkSelector() {
+            if (on && processed) {
+                enabled.addLast(this);
+                wakeup();
+            } else {
+                disabled.addLast(this);
+            }
+        }
+
+    }
+
+    public Selectable<T> create(T value) {
+        SelectableImpl rc = new SelectableImpl(value);
+        synchronized (mutex) {
+            rc.linkSelector();
+        }
+        return rc;
+    }
+
+    /**
+     * 
+     * @return
+     */
+    public List<Selectable<T>> pollNoWait() {
+        synchronized (mutex) {
+            ArrayList<Selectable<T>> rc = new ArrayList<Selectable<T>>();
+            SelectableImpl curr = enabled.getHead();
+            while (curr != null) {
+                curr.on = false;
+                curr.processed = false;
+                rc.add(curr);
+                curr = curr.getNext();
+            }
+            disabled.addLast(enabled);
+            return rc;
+        }
+    }
+
+    public void wakeup() {
+        Runnable wakeup;
+        synchronized (mutex) {
+            mutex.notifyAll();
+            wakeup = runOnWakeup;
+        }
+        if (wakeup != null) {
+            wakeup.run();
+        }
+    }
+
+    public Runnable getRunOnWakeup() {
+        synchronized (mutex) {
+            return runOnWakeup;
+        }
+    }
+
+    public void setRunOnWakeup(Runnable runOnWakeup) {
+        synchronized (mutex) {
+            this.runOnWakeup = runOnWakeup;
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SelectorThreadPool.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SelectorThreadPool.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SelectorThreadPool.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SelectorThreadPool.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A thread pool which polls a list of Selectors to find runnable tasks to
+ * execute. All enabled tasks in a Selector are executed before servicing tasks
+ * in a subsequent selector. This allows you prioritize tasks by adding them to
+ * a selector ahead of another one.
+ */
+public class SelectorThreadPool implements Runnable {
+
+    private final Object workMutex = new Object() {
+    };
+    private boolean workAvailable;
+
+    private final Object selectorsCowMutex = new Object();
+    private ArrayList<Selector<? extends Runnable>> selectors = new ArrayList<Selector<? extends Runnable>>();
+
+    private final String name;
+    private AtomicBoolean run = new AtomicBoolean(true);
+
+    private ThreadPoolExecutor threadPool;
+    private final Runnable runOnWakeupTask = new Runnable() {
+        public void run() {
+            wakeupPoller();
+        }
+    };
+
+    AtomicLong nextId = new AtomicLong();
+
+    public SelectorThreadPool(final String name, int min, int max) {
+        this.name = name;
+        threadPool = new ThreadPoolExecutor(min, max, 1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
+        threadPool.setThreadFactory(new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread rc = new Thread(runnable, name + ": worker " + nextId.incrementAndGet());
+                return rc;
+            }
+        });
+    }
+
+    public void add(Selector<? extends Runnable> selector) {
+        selector.setRunOnWakeup(runOnWakeupTask);
+        synchronized (selectorsCowMutex) {
+            selectors = new ArrayList<Selector<? extends Runnable>>(selectors);
+            selectors.add(selector);
+        }
+        wakeupPoller();
+    }
+
+    public void remove(Selector<? extends Runnable> selector) {
+        synchronized (selectorsCowMutex) {
+            selectors = new ArrayList<Selector<? extends Runnable>>(selectors);
+            if (selectors.remove(selector)) {
+                selector.setRunOnWakeup(null);
+            }
+        }
+    }
+
+    private ArrayList<Selector<? extends Runnable>> getSelectors() {
+        synchronized (selectorsCowMutex) {
+            return selectors;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void run() {
+        try {
+            while (run.get()) {
+                synchronized (workMutex) {
+                    workAvailable = false;
+                }
+
+                int counter = 0;
+                ArrayList<Selector<? extends Runnable>> selectors = getSelectors();
+                for (Selector selector : selectors) {
+                    List<Selectable> runnables = selector.pollNoWait();
+                    for (Selectable selectable : runnables) {
+                        final Selectable target = selectable;
+                        Runnable wrapper = new Runnable() {
+                            public void run() {
+                                try {
+                                    ((Runnable) target.getValue()).run();
+                                } catch (Throwable e) {
+                                    // TODO: add logging
+                                    e.printStackTrace();
+                                } finally {
+                                    target.setProcessed(true);
+                                    wakeupPoller();
+                                }
+                            }
+
+                        };
+                        try {
+                            threadPool.execute(wrapper);
+                            counter++;
+                        } catch (RejectedExecutionException e) {
+                            // If we could not allocate a thread to
+                            // do that work.. then keep it enabled.
+                            selectable.setEnabled(true);
+                            break;
+                        }
+                    }
+                }
+                if (counter == 0) {
+                    synchronized (workMutex) {
+                        while (!workAvailable && run.get()) {
+                            workMutex.wait();
+                        }
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            // Someone wants this thread to die.
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    private void wakeupPoller() {
+        synchronized (workMutex) {
+            if (!workAvailable) {
+                workAvailable = true;
+                workMutex.notify();
+            }
+        }
+    }
+
+    public void start() {
+        run.set(true);
+        Thread thread = new Thread(this, name + ": poller");
+        thread.start();
+    }
+
+    public void stop() {
+        run.set(false);
+        wakeupPoller();
+        threadPool.shutdownNow();
+    }
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Sequence.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Sequence.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Sequence.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Sequence.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.util;
+
+/**
+ * Represents a range of numbers.
+ * 
+ * @author chirino
+ */
+public class Sequence extends LinkedNode<Sequence> {
+    long first;
+    long last;
+
+    public Sequence(long value) {
+        first = last = value;
+    }
+
+    public Sequence(long first, long last) {
+        this.first = first;
+        this.last = last;
+    }
+
+    public boolean isAdjacentToLast(long value) {
+        return last + 1 == value;
+    }
+
+    public boolean isAdjacentToFirst(long value) {
+        return first - 1 == value;
+    }
+
+    public boolean contains(long value) {
+        return first <= value && value <= last;
+    }
+
+    @Override
+    public String toString() {
+        return first == last ? "" + first : first + "-" + last;
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SequenceSet.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SequenceSet.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SequenceSet.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SequenceSet.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Keeps track of a added long values. Collapses ranges of numbers using a
+ * Sequence representation. Use to keep track of received message ids to find
+ * out if a message is duplicate or if there are any missing messages.
+ * 
+ * @author chirino
+ */
+public class SequenceSet {
+    LinkedNodeList<Sequence> sequences = new LinkedNodeList<Sequence>();
+
+    /**
+     * 
+     * @param value
+     *            the value to add to the list
+     * @return false if the value was a duplicate.
+     */
+    synchronized public boolean add(long value) {
+
+        if (sequences.isEmpty()) {
+            sequences.addFirst(new Sequence(value));
+            return true;
+        }
+
+        Sequence sequence = sequences.getHead();
+        while (sequence != null) {
+
+            if (sequence.isAdjacentToLast(value)) {
+                // grow the sequence...
+                sequence.last = value;
+                // it might connect us to the next sequence..
+                if (sequence.getNext() != null) {
+                    Sequence next = sequence.getNext();
+                    if (next.isAdjacentToFirst(value)) {
+                        // Yep the sequence connected.. so join them.
+                        sequence.last = next.last;
+                        next.unlink();
+                    }
+                }
+                return true;
+            }
+
+            if (sequence.isAdjacentToFirst(value)) {
+                // grow the sequence...
+                sequence.first = value;
+
+                // it might connect us to the previous
+                if (sequence.getPrevious() != null) {
+                    Sequence prev = sequence.getPrevious();
+                    if (prev.isAdjacentToLast(value)) {
+                        // Yep the sequence connected.. so join them.
+                        sequence.first = prev.first;
+                        prev.unlink();
+                    }
+                }
+                return true;
+            }
+
+            // Did that value land before this sequence?
+            if (value < sequence.first) {
+                // Then insert a new entry before this sequence item.
+                sequence.linkBefore(new Sequence(value));
+                return true;
+            }
+
+            // Did that value land within the sequence? The it's a duplicate.
+            if (sequence.contains(value)) {
+                return false;
+            }
+
+            sequence = sequence.getNext();
+        }
+
+        // Then the value is getting appended to the tail of the sequence.
+        sequences.addLast(new Sequence(value));
+        return true;
+    }
+
+    /**
+     * @return all the id Sequences that are missing from this set that are not
+     *         in between the range provided.
+     */
+    synchronized public List<Sequence> getMissing(long first, long last) {
+        ArrayList<Sequence> rc = new ArrayList<Sequence>();
+        if (first > last) {
+            throw new IllegalArgumentException("First cannot be more than last");
+        }
+        if (sequences.isEmpty()) {
+            // We are missing all the messages.
+            rc.add(new Sequence(first, last));
+            return rc;
+        }
+
+        Sequence sequence = sequences.getHead();
+        while (sequence != null && first <= last) {
+            if (sequence.contains(first)) {
+                first = sequence.last + 1;
+            } else {
+                if (first < sequence.first) {
+                    if (last < sequence.first) {
+                        rc.add(new Sequence(first, last));
+                        return rc;
+                    } else {
+                        rc.add(new Sequence(first, sequence.first - 1));
+                        first = sequence.last + 1;
+                    }
+                }
+            }
+            sequence = sequence.getNext();
+        }
+
+        if (first <= last) {
+            rc.add(new Sequence(first, last));
+        }
+        return rc;
+    }
+
+    /**
+     * @return all the Sequence that are in this list
+     */
+    synchronized public List<Sequence> getReceived() {
+        ArrayList<Sequence> rc = new ArrayList<Sequence>(sequences.size());
+        Sequence sequence = sequences.getHead();
+        while (sequence != null) {
+            rc.add(new Sequence(sequence.first, sequence.last));
+            sequence = sequence.getNext();
+        }
+        return rc;
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Usage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Usage.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Usage.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Usage.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.util;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.Service;
+
+/**
+ * Used to keep track of how much of something is being used so that a
+ * productive working set usage can be controlled. Main use case is manage
+ * memory usage.
+ * 
+ * @org.apache.xbean.XBean
+ * @version $Revision: 1.3 $
+ */
+public class Usage implements Service {
+
+    protected final Object usageMutex = new Object();
+    protected int percentUsage;
+    private int percentUsageMinDelta = 1;
+    private String name;
+    private AtomicBoolean started = new AtomicBoolean();
+    private long usage;
+    private long limit;
+
+    public Usage() {
+        this("default");
+    }
+
+    public Usage(String name) {
+        this.name = name;
+    }
+
+    /**
+     * @throws InterruptedException
+     */
+    public void waitForSpace() throws InterruptedException {
+        synchronized (usageMutex) {
+            for (int i = 0; percentUsage >= 100; i++) {
+                usageMutex.wait();
+            }
+        }
+    }
+
+    /**
+     * @param timeout
+     * @throws InterruptedException
+     * @return true if space
+     */
+    public boolean waitForSpace(long timeout) throws InterruptedException {
+        synchronized (usageMutex) {
+            if (percentUsage >= 100) {
+                usageMutex.wait(timeout);
+            }
+            return percentUsage < 100;
+        }
+    }
+
+    public boolean isFull() {
+        synchronized (usageMutex) {
+            return percentUsage >= 100;
+        }
+    }
+
+    /**
+     * Tries to increase the usage by value amount but blocks if this object is
+     * currently full.
+     * 
+     * @param value
+     * @throws InterruptedException
+     */
+    public void enqueueUsage(long value) throws InterruptedException {
+        waitForSpace();
+        increaseUsage(value);
+    }
+
+    /**
+     * Increases the usage by the value amount.
+     * 
+     * @param value
+     */
+    public void increaseUsage(long value) {
+        if (value == 0) {
+            return;
+        }
+        int percentUsage;
+        synchronized (usageMutex) {
+            usage += value;
+            percentUsage = caclPercentUsage();
+        }
+        setPercentUsage(percentUsage);
+    }
+
+    /**
+     * Decreases the usage by the value amount.
+     * 
+     * @param value
+     */
+    public void decreaseUsage(long value) {
+        if (value == 0) {
+            return;
+        }
+        int percentUsage;
+        synchronized (usageMutex) {
+            usage -= value;
+            percentUsage = caclPercentUsage();
+        }
+        setPercentUsage(percentUsage);
+    }
+
+    protected long retrieveUsage() {
+        return usage;
+    }
+
+    public long getUsage() {
+        synchronized (usageMutex) {
+            return usage;
+        }
+    }
+
+    public void setUsage(long usage) {
+        synchronized (usageMutex) {
+            this.usage = usage;
+        }
+    }
+
+    public long getLimit() {
+        synchronized (usageMutex) {
+            return limit;
+        }
+    }
+
+    /**
+     * Sets the memory limit in bytes. Setting the limit in bytes will set the
+     * usagePortion to 0 since the UsageManager is not going to be portion based
+     * off the parent. When set using XBean, you can use values such as: "20
+     * mb", "1024 kb", or "1 gb"
+     * 
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
+     */
+    public void setLimit(long limit) {
+        if (percentUsageMinDelta < 0) {
+            throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
+        }
+        synchronized (usageMutex) {
+            this.limit = limit;
+        }
+        onLimitChange();
+    }
+
+    protected void onLimitChange() {
+        // Reset the percent currently being used.
+        int percentUsage;
+        synchronized (usageMutex) {
+            percentUsage = caclPercentUsage();
+        }
+        setPercentUsage(percentUsage);
+    }
+
+    /*
+     * Sets the minimum number of percentage points the usage has to change
+     * before a UsageListener event is fired by the manager.
+     */
+    public int getPercentUsage() {
+        synchronized (usageMutex) {
+            return percentUsage;
+        }
+    }
+
+    public int getPercentUsageMinDelta() {
+        synchronized (usageMutex) {
+            return percentUsageMinDelta;
+        }
+    }
+
+    /**
+     * Sets the minimum number of percentage points the usage has to change
+     * before a UsageListener event is fired by the manager.
+     * 
+     * @param percentUsageMinDelta
+     */
+    public void setPercentUsageMinDelta(int percentUsageMinDelta) {
+        if (percentUsageMinDelta < 1) {
+            throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0");
+        }
+        int percentUsage;
+        synchronized (usageMutex) {
+            this.percentUsageMinDelta = percentUsageMinDelta;
+            percentUsage = caclPercentUsage();
+        }
+        setPercentUsage(percentUsage);
+    }
+
+    protected void setPercentUsage(int value) {
+        synchronized (usageMutex) {
+            int oldValue = percentUsage;
+            percentUsage = value;
+            if (oldValue != value) {
+                fireEvent(oldValue, value);
+            }
+        }
+    }
+
+    protected int caclPercentUsage() {
+        if (limit == 0) {
+            return 0;
+        }
+        return (int) ((((retrieveUsage() * 100) / limit) / percentUsageMinDelta) * percentUsageMinDelta);
+    }
+
+    private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
+        if (started.get()) {
+            // Switching from being full to not being full..
+            if (oldPercentUsage >= 100 && newPercentUsage < 100) {
+                synchronized (usageMutex) {
+                    usageMutex.notifyAll();
+                    // for (Iterator<Runnable> iter = new
+                    // ArrayList<Runnable>(callbacks).iterator();
+                    // iter.hasNext();) {
+                    // Runnable callback = iter.next();
+                    // getExecutor().execute(callback);
+                    // }
+                    // callbacks.clear();
+                }
+            }
+            // Let the listeners know on a separate thread
+            // Runnable listenerNotifier = new Runnable() {
+            //            
+            // public void run() {
+            // for (Iterator<UsageListener> iter = listeners.iterator();
+            // iter.hasNext();) {
+            // UsageListener l = iter.next();
+            // l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
+            // }
+            // }
+            //            
+            // };
+            // getExecutor().execute(listenerNotifier);
+        }
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String toString() {
+        return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + " limit=" + limit + " percentUsageMinDelta=" + percentUsageMinDelta + "%";
+    }
+
+    public void start() {
+        started.set(true);
+    }
+
+    public void stop() {
+        if (started.compareAndSet(true, false)) {
+            synchronized (usageMutex) {
+                usageMutex.notifyAll();
+            }
+        }
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+}



Mime
View raw message