Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/package.html URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/package.html?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/package.html (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/package.html Tue Mar 4 13:01:41 2008 @@ -0,0 +1,25 @@ + + + + + + +Provides the Journal interface which is a simple facade to an AsyncDataManager. It handles encoding and decoding all the journal entries stored in the AsyncDataManager. + + + Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/package.html URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/package.html?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/package.html (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/package.html Tue Mar 4 13:01:41 2008 @@ -0,0 +1,28 @@ + + + + + + +A persistent implementation of the org.apache.activemq.broker.router.store.api package. Operations are appended to a journal log file which allows +for very fast concurrent operations. It makes use of an implementation of the org.apache.activemq.broker.router.index.api to actually track what messages are in which +destination. The interface to org.apache.activemq.broker.router.index.api is designed so that index and asynchronously update it's data files and if it looses it some +of the last updates due to a failure, the journal will re-do those lost operations so that it stays consistent. + + + Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStore.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStore.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStore.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStore.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,360 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.store.memory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.activemq.broker.router.api.Destination; +import org.apache.activemq.broker.router.store.api.CacheEntry; +import org.apache.activemq.broker.router.store.api.DataStore; +import org.apache.activemq.broker.router.store.api.ReferenceStore; +import org.apache.activemq.broker.router.util.LinkedNode; +import org.apache.activemq.broker.router.util.LinkedNodeList; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.usage.SystemUsage; + +/** + * + * @author chirino + */ +public class MemoryDataStore extends MemoryStore implements DataStore { + // private final static Log LOG = LogFactory.getLog(MemoryDataStore.class); + + private final MemoryStoreManager stores = new MemoryStoreManager() { + @Override + protected ReferenceStore createStore(String name) { + return new MemoryReferenceStore(name, MemoryDataStore.this); + } + + @Override + protected void destroyStore(ReferenceStore store) { + } + }; + + public class MemoryCacheEntry extends LinkedNode implements CacheEntry { + + // Tracks load requests by the broker.. keeps the message in memory + private int loadCounter; + // Tracks how many reference containers are using this entry.. make it + // easier to delete unused entries. + private int referenceCounter; + + public final long id; + public Message message; + public boolean locked; + public long sequenceId; + + public MemoryCacheEntry(long id, Message data) { + this.id = id; + this.message = data; + } + + public Message getMessage() { + return message; + } + + public long getId() { + return id; + } + + public void incrementRedeliveryCounter() { + } + + public DataStore getStore() { + return MemoryDataStore.this; + } + + synchronized public void load() { + loadCounter++; + } + + synchronized public boolean loadUnlessDropped() { + if (message != null) { + loadCounter++; + return true; + } + return false; + } + + synchronized public void unload() { + if (!(loadCounter >= 0)) { + System.out.println("unload() should match a corresponding load() call. "); + } + + // assert loadCounter >= 0: "unload() should match a corresponding + // load() call. "; + loadCounter--; + if (autoRemove && message != null && loadCounter < 1 && referenceCounter < 1) { + systemUsage.getMemoryUsage().decreaseUsage(message.getSize()); + message = null; + } + } + + synchronized public void reference() { + referenceCounter++; + } + + synchronized public void unreference() { + assert referenceCounter > 0 : "unreference() should only be called to undo a previous reference()"; + referenceCounter--; + if (autoRemove && message != null && loadCounter < 1 && referenceCounter < 1) { + systemUsage.getMemoryUsage().decreaseUsage(message.getSize()); + message = null; + } + } + + @Override + public String toString() { + return "id: " + id + ", linked: " + isLinked() + ", loadCounter: " + loadCounter + ", referenceCounter: " + referenceCounter; + } + + synchronized public boolean lock() { + if (locked) + return false; + locked = true; + return true; + } + + synchronized public void unlock() { + locked = false; + } + + synchronized public void drop() { + if (message != null) { + systemUsage.getMemoryUsage().decreaseUsage(message.getSize()); + message = null; + } + } + } + + private final Map cache = new ConcurrentHashMap(); + private final LinkedNodeList records = new LinkedNodeList(); + private final AtomicReference lastRecordId = new AtomicReference(); + private long lastSequenceId;; + + private SystemUsage systemUsage; + private Destination destination; + private boolean autoRemove; + + public MemoryDataStore(String name) { + super(name); + } + + // ///////////////////////////////////////////////////////////////// + // StoreManager related methods + // ///////////////////////////////////////////////////////////////// + public ReferenceStore addStore(String name) throws Exception { + return stores.addStore(name); + } + + public ReferenceStore getStore(String name) throws Exception { + return stores.getStore(name); + } + + public List getStores() throws Exception { + return stores.getStores(); + } + + public void removeStore(ReferenceStore store) throws Exception { + stores.removeStore(store); + } + + // ///////////////////////////////////////////////////////////////// + // Storage methods + // ///////////////////////////////////////////////////////////////// + public MemoryCacheEntry addMessage(long id, Message record, Runnable onCompleted) throws Exception { + + MemoryCacheEntry rc = new MemoryCacheEntry(id, record); + if (!autoRemove) { + synchronized (records) { + rc.sequenceId = ++lastSequenceId; + records.addLast(rc); + } + cache.put(rc.getId(), rc); + } + systemUsage.getMemoryUsage().increaseUsage(record.getSize()); + rc.load(); + if (onCompleted != null) { + onCompleted.run(); + } + return rc; + } + + public AddMessageTransactionAction addTransactedMessage(final long id, final Message message, final TransactionId tx, Runnable onCompleted) throws Exception { + if (onCompleted != null) { + onCompleted.run(); + } + return new AddMessageTransactionAction() { + + public Message getMessage() { + return message; + } + + public DataStore getDataStore() { + return MemoryDataStore.this; + } + + public TransactionId getTransactionId() { + return tx; + } + + public CacheEntry complete() throws Exception { + return addMessage(id, message, null); + } + + }; + } + + public List load(CacheEntry firstCE, CacheEntry lastCE, int max) throws Exception { + + ArrayList rc = new ArrayList(max); + if (rc.size() == max) { + return rc; + } + + MemoryCacheEntry first = (MemoryCacheEntry) firstCE; + MemoryCacheEntry last = (MemoryCacheEntry) lastCE; + + boolean inRange = first == null ? true : false; + MemoryCacheEntry record = (MemoryCacheEntry) first; + + synchronized (records) { + if (records.isEmpty()) { + return rc; + } + if (record == null) { + record = records.getHead(); + } + + // record might be a stale cache entry that has already been + // unlinked. But we can still + // use it cause it has valid pointers to the next record location. + MemoryCacheEntry lastId = record; + // MemoryCacheEntry firstId = record; + while (true) { + // Don't load more than the max messages. + if (inRange) { + if (rc.size() == max) { + return rc; + } + if (record.isLinked() && (last == null || record.sequenceId <= last.sequenceId)) { + rc.add(record); + } else { + // Don't load past the last message. + break; + } + } else { + // Don't load before the first message. + if (record.sequenceId > first.sequenceId) { + inRange = true; + if (record.isLinked()) { + rc.add(record); + } + } + } + // Dont go past that last record. + if (record == records.getTail()) { + break; + } + record = record.getNextCircular(); + + // Lets make sure we are not looping around. + assert record.sequenceId > lastId.sequenceId : "Load seems to be looping around the linked list."; + lastId = record; + } + ; + } + return rc; + } + + public void remove(long id, Runnable onCompleted) throws Exception { + MemoryCacheEntry record = cache.remove(id); + if (record != null) { + record.drop(); + synchronized (records) { + record.unlink(); + } + } + if (onCompleted != null) { + onCompleted.run(); + } + } + + public RemoveMesageTransactionAction removeTransacted(final long id, final TransactionId tx, Runnable onCompleted) throws Exception { + if (onCompleted != null) { + onCompleted.run(); + } + return new RemoveMesageTransactionAction() { + public long getId() { + return id; + } + + public DataStore getDataStore() { + return MemoryDataStore.this; + } + + public TransactionId getTransactionId() { + return tx; + } + + public void complete() throws Exception { + remove(id, null); + } + }; + } + + public void removeUnreferencedRecords(CacheEntry maxId) throws Exception { + // We remove unreferenced records on the fly using reference counting.. + } + + public long size() throws Exception { + synchronized (records) { + return records.size(); + } + } + + public MemoryCacheEntry getLastAddedEntry() throws Exception { + return lastRecordId.get(); + } + + public MemoryCacheEntry load(long id) { + MemoryCacheEntry rc = cache.get(id); + if (rc != null) { + rc.load(); + } + return rc; + } + + public Destination getDestination() { + return destination; + } + + public void setDestination(Destination destination) { + this.destination = destination; + this.systemUsage = destination.getSystemUsage(); + } + + public void setAutoRemove(boolean autoRemove) { + this.autoRemove = autoRemove; + } + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStoreManager.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStoreManager.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStoreManager.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStoreManager.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.store.memory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.activemq.broker.router.store.api.DataStore; +import org.apache.activemq.broker.router.store.api.DataStoreManager; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.TransactionInfo; + +/** + * + * @author chirino + */ +public class MemoryDataStoreManager implements DataStoreManager { + + private final MemoryStoreManager stores = new MemoryStoreManager() { + @Override + protected DataStore createStore(String name) { + return new MemoryDataStore(name); + } + + @Override + protected void destroyStore(DataStore store) { + } + }; + + public void start() throws Exception { + } + + public void stop() throws Exception { + } + + public DataStore addStore(String name) throws Exception { + return stores.addStore(name); + } + + public DataStore getStore(String name) throws Exception { + return stores.getStore(name); + } + + public List getStores() throws Exception { + return stores.getStores(); + } + + public void removeStore(DataStore store) throws Exception { + stores.removeStore(store); + } + + public void record(TransactionInfo txOperation, Runnable onComplete) throws Exception { + } + + public Map> recoverPendingTransactions() throws Exception { + return new HashMap>(); + } + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryReferenceStore.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryReferenceStore.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryReferenceStore.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryReferenceStore.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.store.memory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.activemq.broker.router.store.api.CacheEntry; +import org.apache.activemq.broker.router.store.api.DataStore; +import org.apache.activemq.broker.router.store.api.ReferenceStore; +import org.apache.activemq.broker.router.store.memory.MemoryDataStore.MemoryCacheEntry; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.TransactionId; + +/** + * + * @author chirino + */ +public class MemoryReferenceStore extends MemoryStore implements ReferenceStore { + + private final MemoryDataStore dataStore; + private final LinkedList references = new LinkedList(); + private ReferenceCacheEntry lastRecordId; + private long lastSequenceId; + + static class ReferenceCacheEntry implements CacheEntry { + final MemoryCacheEntry cacheEntry; + final long sequenceId; + + public ReferenceCacheEntry(MemoryCacheEntry cacheEntry, long sequenceId) { + this.cacheEntry = cacheEntry; + this.sequenceId = sequenceId; + } + + public long getId() { + return cacheEntry.getId(); + } + + public Message getMessage() { + return cacheEntry.getMessage(); + } + + public DataStore getStore() { + return cacheEntry.getStore(); + } + + public void incrementRedeliveryCounter() { + cacheEntry.incrementRedeliveryCounter(); + } + + public void load() { + cacheEntry.load(); + } + + public boolean lock() { + return cacheEntry.lock(); + } + + public void unload() { + cacheEntry.unload(); + } + + public void unlock() { + cacheEntry.unlock(); + } + } + + public MemoryReferenceStore(String name, MemoryDataStore dataStore) { + super(name); + this.dataStore = dataStore; + } + + public void addReference(CacheEntry cacheEntry) throws Exception { + MemoryCacheEntry mce = (MemoryCacheEntry) cacheEntry; + mce.reference(); + synchronized (this) { + lastRecordId = new ReferenceCacheEntry(mce, ++lastSequenceId); + references.add(lastRecordId); + } + } + + public List load(CacheEntry firstCE, CacheEntry lastCE, int max) throws Exception { + + ReferenceCacheEntry first = (ReferenceCacheEntry) firstCE; + ReferenceCacheEntry last = (ReferenceCacheEntry) lastCE; + + ArrayList rc = new ArrayList(max); + synchronized (this) { + + boolean inRange = first == null ? true : false; + + for (Iterator iterator = references.iterator(); iterator.hasNext();) { + ReferenceCacheEntry record = iterator.next(); + + // Don't load more than the max messages. + if (rc.size() == max) { + break; + } + if (inRange) { + if (last == null || record.sequenceId <= last.sequenceId) { + if (record.cacheEntry.loadUnlessDropped()) { + rc.add(record); + } else { + iterator.remove(); + } + } else { + // Don't load past the last message. + break; + } + } else { + // Don't load before the first message.. + if (first == null || record.sequenceId > first.sequenceId) { + inRange = true; + if (record.cacheEntry.loadUnlessDropped()) { + rc.add(record); + } else { + iterator.remove(); + } + } + } + } + + } + + return rc; + } + + public List remove(CacheEntry firstCE, CacheEntry lastCE, int max) { + ReferenceCacheEntry first = (ReferenceCacheEntry) firstCE; + ReferenceCacheEntry last = (ReferenceCacheEntry) lastCE; + + ArrayList rc = new ArrayList(max); + if (rc.size() == max) { + return rc; + } + + synchronized (this) { + + boolean inRange = first == null ? true : false; + for (Iterator iterator = references.iterator(); iterator.hasNext();) { + ReferenceCacheEntry record = iterator.next(); + + if (inRange) { + // Don't load more than the max messages. + if (rc.size() == max) { + break; + } + if (last == null || record.sequenceId < last.sequenceId) { + if (record.cacheEntry.loadUnlessDropped()) { + record.cacheEntry.unreference(); + rc.add(record); + } + iterator.remove(); + } else { + // Don't load past the last message. + break; + } + } else { + // Don't load before the first message.. + if (record.sequenceId > first.sequenceId) { + inRange = true; + if (record.cacheEntry.loadUnlessDropped()) { + record.cacheEntry.unreference(); + rc.add(record); + } + iterator.remove(); + } + } + } + } + return rc; + } + + public long size() throws Exception { + return references.size(); + } + + public CacheEntry getLastAddedEntry() throws Exception { + return lastRecordId; + } + + public void remove(long id, Runnable onCompleted) throws Exception { + synchronized (this) { + for (Iterator iterator = references.iterator(); iterator.hasNext();) { + ReferenceCacheEntry ce = iterator.next(); + if (ce.getId() == id) { + ce.cacheEntry.unreference(); + iterator.remove(); + break; + } + } + } + if (onCompleted != null) { + onCompleted.run(); + } + } + + public RemoveReferenceTransactionAction removeReferenceTransacted(final long id, final TransactionId tx, Runnable onCompleted) throws Exception { + if (onCompleted != null) { + onCompleted.run(); + } + return new RemoveReferenceTransactionAction() { + + public long getId() { + return id; + } + + public ReferenceStore getReferenceStore() { + return MemoryReferenceStore.this; + } + + public DataStore getDataStore() { + return dataStore; + } + + public TransactionId getTransactionId() { + return tx; + } + + public void complete() throws Exception { + remove(id, null); + } + }; + } + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStore.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStore.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStore.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStore.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.store.memory; + +import java.util.Map; + +import org.apache.activemq.broker.router.store.api.Store; + +/** + * + * @author chirino + */ +abstract public class MemoryStore implements Store { + + private String name; + private Map properties; + + public MemoryStore(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setProperties(Map properties) throws Exception { + this.properties = properties; + } + + public Map getProperties() throws Exception { + return properties; + } + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStoreManager.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStoreManager.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStoreManager.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStoreManager.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.store.memory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.activemq.broker.router.store.api.Store; +import org.apache.activemq.broker.router.store.api.StoreManager; + +/** + * + * @author chirino + */ +abstract public class MemoryStoreManager implements StoreManager { + + private HashMap stores = new HashMap(); + + synchronized public T addStore(String name) throws Exception { + T rc = stores.get(name); + if (rc == null) { + rc = createStore(name); + stores.put(name, rc); + } + return rc; + } + + abstract protected T createStore(String name); + + abstract protected void destroyStore(T store); + + synchronized public T getStore(String name) throws Exception { + return stores.get(name); + } + + synchronized public List getStores() throws Exception { + ArrayList rc = new ArrayList(); + rc.addAll(stores.values()); + return rc; + } + + synchronized public void removeStore(T store) throws Exception { + stores.remove(store.getName()); + destroyStore(store); + } + +} \ No newline at end of file Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/package.html URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/package.html?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/package.html (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/package.html Tue Mar 4 13:01:41 2008 @@ -0,0 +1,28 @@ + + + + + + +A non-persistent implementation of the org.apache.activemq.broker.router.store.api package. +All messages put in this store are held in ram and count against the Destinations's MemoryUsage. + +TODO: subclass this implementation so that we can spool old messages off to disk. + + + Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/DuplicateAndMissedChecker.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/DuplicateAndMissedChecker.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/DuplicateAndMissedChecker.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/DuplicateAndMissedChecker.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.util; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.activemq.command.MessageId; + +/** + * Utility class used to check for duplicate and miss messages. Keeps track of a + * SequenceSet per producer. + * + * @author chirino + */ +public class DuplicateAndMissedChecker { + + final HashMap sequences = new HashMap(); + final ArrayList orderCheckers = new ArrayList(); + + public void onMessageId(MessageId messageId) { + String pid = messageId.getProducerId().toString(); + long id = messageId.getProducerSequenceId(); + onMessage(pid, id); + } + + /** + * + * @param pid + * @param id + * @throws IllegalArgumentException + * is the message id is a duplicate + */ + public void onMessage(String pid, long id) throws IllegalArgumentException { + SequenceSet sequenceSet = getSequenceSet(pid); + if (!sequenceSet.add(id)) { + throw new IllegalArgumentException("Message from producer: " + pid + " was a duplicate: " + id); + } + } + + private SequenceSet getSequenceSet(String pid) { + SequenceSet sequenceSet; + synchronized (sequences) { + sequenceSet = sequences.get(pid); + if (sequenceSet == null) { + sequenceSet = new SequenceSet(); + sequences.put(pid, sequenceSet); + } + } + return sequenceSet; + } + + public void addOrderChecker(OrderChecker checker) { + synchronized (orderCheckers) { + orderCheckers.add(checker); + } + } + + public void removeOrderChecker(OrderChecker checker) { + synchronized (orderCheckers) { + orderCheckers.remove(checker); + } + } + + public Map> findMissingMessages() { + + Map> missing = new HashMap>(); + + HashMap sequences; + synchronized (this.sequences) { + sequences = new HashMap(this.sequences); + } + + for (Map.Entry entry : sequences.entrySet()) { + Long firstId = null; + synchronized (orderCheckers) { + for (OrderChecker checker : orderCheckers) { + Long id = checker.getLastIdForProducer(entry.getKey()); + if (id != null) { + if (firstId == null) { + firstId = id; + } else { + if (id.compareTo(firstId) <= 0) { + firstId = id; + } + } + } else { + // We can't reliably know the minum last value that we + // should have received + // until all the consumer report at least 1 received + // message. + firstId = null; + break; + } + } + } + if (firstId != null) { + missing.put(entry.getKey(), entry.getValue().getMissing(1, firstId)); + } + } + + return missing; + } + + public Map> getMissingMessages(long first, long last) { + + HashMap sequences; + synchronized (this.sequences) { + sequences = new HashMap(this.sequences); + } + + Map> missing = new HashMap>(); + for (Map.Entry entry : sequences.entrySet()) { + missing.put(entry.getKey(), entry.getValue().getMissing(first, last)); + } + return missing; + } + + public Map> getReceivedMessages() { + Map> received = new HashMap>(); + + HashMap sequences; + synchronized (this.sequences) { + sequences = new HashMap(this.sequences); + } + for (Map.Entry entry : sequences.entrySet()) { + received.put(entry.getKey(), entry.getValue().getReceived()); + } + return received; + } + +} \ No newline at end of file Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNode.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNode.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNode.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNode.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.util; + +/** + * Provides a base class for you to extend when you want object to maintain a + * doubly linked list to other objects without using a collection class. + * + * @author chirino + */ +public class LinkedNode> { + + protected LinkedNodeList list; + protected T next; + protected T prev; + + public LinkedNode() { + } + + @SuppressWarnings("unchecked") + private T getThis() { + return (T) this; + } + + public T getHeadNode() { + return list.head; + } + + public T getTailNode() { + return list.head.prev; + } + + public T getNext() { + return isTailNode() ? null : next; + } + + public T getPrevious() { + return isHeadNode() ? null : prev; + } + + public T getNextCircular() { + return next; + } + + public T getPreviousCircular() { + return prev; + } + + public boolean isHeadNode() { + return list.head == this; + } + + public boolean isTailNode() { + return list.head.prev == this; + } + + /** + * @param node + * the node to link after this node. + * @return this + */ + public void linkAfter(T node) { + if (node == this) { + throw new IllegalArgumentException("You cannot link to yourself"); + } + if (node.list != null) { + throw new IllegalArgumentException("You only insert nodes that are not in a list"); + } + if (list == null) { + throw new IllegalArgumentException("This node is not yet in a list"); + } + + node.list = list; + + // given we linked this<->next and are inserting node in between + node.prev = getThis(); // link this<-node + node.next = next; // link node->next + next.prev = node; // link node<-next + next = node; // this->node + list.size++; + } + + /** + * @param rightList + * the node to link after this node. + * @return this + */ + public void linkAfter(LinkedNodeList rightList) { + + if (rightList == list) { + throw new IllegalArgumentException("You cannot link to yourself"); + } + if (list == null) { + throw new IllegalArgumentException("This node is not yet in a list"); + } + + T rightHead = rightList.head; + T rightTail = rightList.head.prev; + list.reparent(rightList); + + // given we linked this<->next and are inserting list in between + rightHead.prev = getThis(); // link this<-list + rightTail.next = next; // link list->next + next.prev = rightTail; // link list<-next + next = rightHead; // this->list + list.size++; + } + + /** + * @param node + * the node to link after this node. + * @return + * @return this + */ + public void linkBefore(T node) { + + if (node == this) { + throw new IllegalArgumentException("You cannot link to yourself"); + } + if (node.list != null) { + throw new IllegalArgumentException("You only insert nodes that are not in a list"); + } + if (list == null) { + throw new IllegalArgumentException("This node is not yet in a list"); + } + + node.list = list; + + // given we linked prev<->this and are inserting node in between + node.next = getThis(); // node->this + node.prev = prev; // prev<-node + prev.next = node; // prev->node + prev = node; // node<-this + + if (this == list.head) { + list.head = node; + } + list.size++; + } + + /** + * @param leftList + * the node to link after this node. + * @return + * @return this + */ + public void linkBefore(LinkedNodeList leftList) { + + if (leftList == list) { + throw new IllegalArgumentException("You cannot link to yourself"); + } + if (list == null) { + throw new IllegalArgumentException("This node is not yet in a list"); + } + + T leftHead = leftList.head; + T leftTail = leftList.head.prev; + list.reparent(leftList); + + // given we linked prev<->this and are inserting list in between + leftTail.next = getThis(); // list->this + leftHead.prev = prev; // prev<-list + prev.next = leftHead; // prev->list + prev = leftTail; // list<-this + + if (isHeadNode()) { + list.head = leftHead; + } + } + + public void linkToTail(LinkedNodeList target) { + if (list != null) { + throw new IllegalArgumentException("This node is already linked to a node"); + } + + if (target.head == null) { + next = prev = target.head = getThis(); + list = target; + list.size++; + } else { + target.head.prev.linkAfter(getThis()); + } + } + + public void linkToHead(LinkedNodeList target) { + if (list != null) { + throw new IllegalArgumentException("This node is already linked to a node"); + } + + if (target.head == null) { + next = prev = target.head = getThis(); + list = target; + list.size++; + } else { + target.head.linkBefore(getThis()); + } + } + + /** + * Removes this node out of the linked list it is chained in. + */ + public void unlink() { + + // If we are allready unlinked... + if (list == null) { + return; + } + + if (getThis() == prev) { + // We are the only item in the list + list.head = null; + } else { + // given we linked prev<->this<->next + next.prev = prev; // prev<-next + prev.next = next; // prev->next + + if (isHeadNode()) { + list.head = next; + } + } + list.size--; + list = null; + } + + /** + * Splits the list into 2 lists. This node becomes the tail of this list. + * Then 2nd list is returned. + * + * @return An empty list if this is a tail node. + */ + public LinkedNodeList splitAfter() { + + if (isTailNode()) { + return new LinkedNodeList(); + } + + // Create the new list + LinkedNodeList newList = new LinkedNodeList(); + newList.head = next; + + // Update the head and tail of the new list so that they point to each + // other. + newList.head.prev = list.head.prev; // new list: tail<-head + newList.head.prev.next = newList.head; // new list: tail->head + next = list.head; // old list: tail->head + list.head.prev = getThis(); // old list: tail<-head + + // Update all the nodes in the new list so that they know of their new + // list owner. + T n = newList.head; + newList.size++; + list.size--; + do { + n.list = newList; + n = n.next; + newList.size++; + list.size--; + } while (n != newList.head); + + return newList; + } + + /** + * Splits the list into 2 lists. This node becomes the head of this list. + * Then 2nd list is returned. + * + * @return An empty list if this is a head node. + */ + public LinkedNodeList splitBefore() { + + if (isHeadNode()) { + return new LinkedNodeList(); + } + + // Create the new list + LinkedNodeList newList = new LinkedNodeList(); + newList.head = list.head; + list.head = getThis(); + + T newListTail = prev; + + prev = newList.head.prev; // old list: tail<-head + prev.next = getThis(); // old list: tail->head + newList.head.prev = newListTail; // new list: tail<-head + newListTail.next = newList.head; // new list: tail->head + + // Update all the nodes in the new list so that they know of their new + // list owner. + T n = newList.head; + newList.size++; + list.size--; + do { + n.list = newList; + n = n.next; + newList.size++; + list.size--; + } while (n != newList.head); + + return newList; + } + + public boolean isLinked() { + return list != null; + } + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNodeList.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNodeList.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNodeList.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNodeList.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.util; + +/** + * Provides a list of LinkedNode objects. + * + * @author chirino + */ +public class LinkedNodeList> { + + T head; + int size; + + public LinkedNodeList() { + } + + public boolean isEmpty() { + return head == null; + } + + public void addLast(T node) { + node.linkToTail(this); + } + + public void addFirst(T node) { + node.linkToHead(this); + } + + public T getHead() { + return head; + } + + public T getTail() { + return head.prev; + } + + public void addLast(LinkedNodeList list) { + if (list.isEmpty()) { + return; + } + if (head == null) { + reparent(list); + head = list.head; + list.head = null; + } else { + getTail().linkAfter(list); + } + } + + public void addFirst(LinkedNodeList list) { + if (list.isEmpty()) { + return; + } + if (head == null) { + reparent(list); + head = list.head; + list.head = null; + } else { + getHead().linkBefore(list); + } + } + + public T reparent(LinkedNodeList list) { + size += list.size; + T n = list.head; + do { + n.list = this; + n = n.next; + } while (n != list.head); + list.head = null; + list.size = 0; + return n; + } + + /** + * Move the head to the tail and returns the new head node. + * + * @return + */ + public T rotate() { + return head = head.getNext(); + } + + public int size() { + return size; + } + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/OrderChecker.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/OrderChecker.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/OrderChecker.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/OrderChecker.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.util; + +import java.util.HashMap; + +import org.apache.activemq.command.MessageId; + +/** + * + * @author chirino + */ +public class OrderChecker { + final DuplicateAndMissedChecker next; + final HashMap lastMessageIds = new HashMap(); + + public OrderChecker() { + this.next = null; + } + + public OrderChecker(DuplicateAndMissedChecker next) { + this.next = next; + this.next.addOrderChecker(this); + } + + public void onMessageId(MessageId messageId) { + String pid = messageId.getProducerId().toString(); + long id = messageId.getProducerSequenceId(); + onMessage(pid, id); + } + + public void onMessage(String pid, long id) throws IllegalArgumentException { + synchronized (lastMessageIds) { + Long last = lastMessageIds.get(pid); + if (last != null) { + if (id <= last) { + throw new IllegalArgumentException("Got out of order message from producer: " + pid + " got " + id + ", expected a message larger than " + last); + } + } + lastMessageIds.put(pid, id); + } + if (next != null) { + next.onMessage(pid, id); + } + } + + Long getLastIdForProducer(String pid) { + synchronized (lastMessageIds) { + return lastMessageIds.get(pid); + } + } +} \ No newline at end of file Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selectable.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selectable.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selectable.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selectable.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.util; + +/** + * + * @author chirino + */ +public interface Selectable { + + public T getValue(); + + public void setEnabled(boolean enabled); + + public void setProcessed(boolean enabled); + + public void close(); + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selector.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selector.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selector.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selector.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.util; + +import java.util.ArrayList; +import java.util.List; + +/** + * + * @author chirino + */ +public class Selector { + + final Object mutex = new Object() { + }; + + final LinkedNodeList enabled = new LinkedNodeList(); + final LinkedNodeList disabled = new LinkedNodeList(); + Runnable runOnWakeup; + + private class SelectableImpl extends LinkedNode implements Selectable { + + private T value; + private boolean on; + private boolean processed = true; + + public SelectableImpl(T value) { + this.value = value; + } + + public T getValue() { + return value; + } + + public void setEnabled(boolean on) { + synchronized (mutex) { + if (this.on == on) { + return; + } + this.on = on; + unlink(); + linkSelector(); + } + } + + public void setProcessed(boolean processed) { + synchronized (mutex) { + if (this.processed == processed) { + return; + } + this.processed = processed; + unlink(); + linkSelector(); + } + } + + public void close() { + synchronized (mutex) { + unlink(); + } + } + + private void linkSelector() { + if (on && processed) { + enabled.addLast(this); + wakeup(); + } else { + disabled.addLast(this); + } + } + + } + + public Selectable create(T value) { + SelectableImpl rc = new SelectableImpl(value); + synchronized (mutex) { + rc.linkSelector(); + } + return rc; + } + + /** + * + * @return + */ + public List> pollNoWait() { + synchronized (mutex) { + ArrayList> rc = new ArrayList>(); + SelectableImpl curr = enabled.getHead(); + while (curr != null) { + curr.on = false; + curr.processed = false; + rc.add(curr); + curr = curr.getNext(); + } + disabled.addLast(enabled); + return rc; + } + } + + public void wakeup() { + Runnable wakeup; + synchronized (mutex) { + mutex.notifyAll(); + wakeup = runOnWakeup; + } + if (wakeup != null) { + wakeup.run(); + } + } + + public Runnable getRunOnWakeup() { + synchronized (mutex) { + return runOnWakeup; + } + } + + public void setRunOnWakeup(Runnable runOnWakeup) { + synchronized (mutex) { + this.runOnWakeup = runOnWakeup; + } + } + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SelectorThreadPool.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SelectorThreadPool.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SelectorThreadPool.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SelectorThreadPool.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A thread pool which polls a list of Selectors to find runnable tasks to + * execute. All enabled tasks in a Selector are executed before servicing tasks + * in a subsequent selector. This allows you prioritize tasks by adding them to + * a selector ahead of another one. + */ +public class SelectorThreadPool implements Runnable { + + private final Object workMutex = new Object() { + }; + private boolean workAvailable; + + private final Object selectorsCowMutex = new Object(); + private ArrayList> selectors = new ArrayList>(); + + private final String name; + private AtomicBoolean run = new AtomicBoolean(true); + + private ThreadPoolExecutor threadPool; + private final Runnable runOnWakeupTask = new Runnable() { + public void run() { + wakeupPoller(); + } + }; + + AtomicLong nextId = new AtomicLong(); + + public SelectorThreadPool(final String name, int min, int max) { + this.name = name; + threadPool = new ThreadPoolExecutor(min, max, 1, TimeUnit.SECONDS, new SynchronousQueue()); + threadPool.setThreadFactory(new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread rc = new Thread(runnable, name + ": worker " + nextId.incrementAndGet()); + return rc; + } + }); + } + + public void add(Selector selector) { + selector.setRunOnWakeup(runOnWakeupTask); + synchronized (selectorsCowMutex) { + selectors = new ArrayList>(selectors); + selectors.add(selector); + } + wakeupPoller(); + } + + public void remove(Selector selector) { + synchronized (selectorsCowMutex) { + selectors = new ArrayList>(selectors); + if (selectors.remove(selector)) { + selector.setRunOnWakeup(null); + } + } + } + + private ArrayList> getSelectors() { + synchronized (selectorsCowMutex) { + return selectors; + } + } + + @SuppressWarnings("unchecked") + public void run() { + try { + while (run.get()) { + synchronized (workMutex) { + workAvailable = false; + } + + int counter = 0; + ArrayList> selectors = getSelectors(); + for (Selector selector : selectors) { + List runnables = selector.pollNoWait(); + for (Selectable selectable : runnables) { + final Selectable target = selectable; + Runnable wrapper = new Runnable() { + public void run() { + try { + ((Runnable) target.getValue()).run(); + } catch (Throwable e) { + // TODO: add logging + e.printStackTrace(); + } finally { + target.setProcessed(true); + wakeupPoller(); + } + } + + }; + try { + threadPool.execute(wrapper); + counter++; + } catch (RejectedExecutionException e) { + // If we could not allocate a thread to + // do that work.. then keep it enabled. + selectable.setEnabled(true); + break; + } + } + } + if (counter == 0) { + synchronized (workMutex) { + while (!workAvailable && run.get()) { + workMutex.wait(); + } + } + } + } + } catch (InterruptedException e) { + // Someone wants this thread to die. + Thread.currentThread().interrupt(); + } + } + + private void wakeupPoller() { + synchronized (workMutex) { + if (!workAvailable) { + workAvailable = true; + workMutex.notify(); + } + } + } + + public void start() { + run.set(true); + Thread thread = new Thread(this, name + ": poller"); + thread.start(); + } + + public void stop() { + run.set(false); + wakeupPoller(); + threadPool.shutdownNow(); + } +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Sequence.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Sequence.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Sequence.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Sequence.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.util; + +/** + * Represents a range of numbers. + * + * @author chirino + */ +public class Sequence extends LinkedNode { + long first; + long last; + + public Sequence(long value) { + first = last = value; + } + + public Sequence(long first, long last) { + this.first = first; + this.last = last; + } + + public boolean isAdjacentToLast(long value) { + return last + 1 == value; + } + + public boolean isAdjacentToFirst(long value) { + return first - 1 == value; + } + + public boolean contains(long value) { + return first <= value && value <= last; + } + + @Override + public String toString() { + return first == last ? "" + first : first + "-" + last; + } + +} \ No newline at end of file Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SequenceSet.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SequenceSet.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SequenceSet.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SequenceSet.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.util; + +import java.util.ArrayList; +import java.util.List; + +/** + * Keeps track of a added long values. Collapses ranges of numbers using a + * Sequence representation. Use to keep track of received message ids to find + * out if a message is duplicate or if there are any missing messages. + * + * @author chirino + */ +public class SequenceSet { + LinkedNodeList sequences = new LinkedNodeList(); + + /** + * + * @param value + * the value to add to the list + * @return false if the value was a duplicate. + */ + synchronized public boolean add(long value) { + + if (sequences.isEmpty()) { + sequences.addFirst(new Sequence(value)); + return true; + } + + Sequence sequence = sequences.getHead(); + while (sequence != null) { + + if (sequence.isAdjacentToLast(value)) { + // grow the sequence... + sequence.last = value; + // it might connect us to the next sequence.. + if (sequence.getNext() != null) { + Sequence next = sequence.getNext(); + if (next.isAdjacentToFirst(value)) { + // Yep the sequence connected.. so join them. + sequence.last = next.last; + next.unlink(); + } + } + return true; + } + + if (sequence.isAdjacentToFirst(value)) { + // grow the sequence... + sequence.first = value; + + // it might connect us to the previous + if (sequence.getPrevious() != null) { + Sequence prev = sequence.getPrevious(); + if (prev.isAdjacentToLast(value)) { + // Yep the sequence connected.. so join them. + sequence.first = prev.first; + prev.unlink(); + } + } + return true; + } + + // Did that value land before this sequence? + if (value < sequence.first) { + // Then insert a new entry before this sequence item. + sequence.linkBefore(new Sequence(value)); + return true; + } + + // Did that value land within the sequence? The it's a duplicate. + if (sequence.contains(value)) { + return false; + } + + sequence = sequence.getNext(); + } + + // Then the value is getting appended to the tail of the sequence. + sequences.addLast(new Sequence(value)); + return true; + } + + /** + * @return all the id Sequences that are missing from this set that are not + * in between the range provided. + */ + synchronized public List getMissing(long first, long last) { + ArrayList rc = new ArrayList(); + if (first > last) { + throw new IllegalArgumentException("First cannot be more than last"); + } + if (sequences.isEmpty()) { + // We are missing all the messages. + rc.add(new Sequence(first, last)); + return rc; + } + + Sequence sequence = sequences.getHead(); + while (sequence != null && first <= last) { + if (sequence.contains(first)) { + first = sequence.last + 1; + } else { + if (first < sequence.first) { + if (last < sequence.first) { + rc.add(new Sequence(first, last)); + return rc; + } else { + rc.add(new Sequence(first, sequence.first - 1)); + first = sequence.last + 1; + } + } + } + sequence = sequence.getNext(); + } + + if (first <= last) { + rc.add(new Sequence(first, last)); + } + return rc; + } + + /** + * @return all the Sequence that are in this list + */ + synchronized public List getReceived() { + ArrayList rc = new ArrayList(sequences.size()); + Sequence sequence = sequences.getHead(); + while (sequence != null) { + rc.add(new Sequence(sequence.first, sequence.last)); + sequence = sequence.getNext(); + } + return rc; + } +} \ No newline at end of file Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Usage.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Usage.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Usage.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Usage.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.util; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.Service; + +/** + * Used to keep track of how much of something is being used so that a + * productive working set usage can be controlled. Main use case is manage + * memory usage. + * + * @org.apache.xbean.XBean + * @version $Revision: 1.3 $ + */ +public class Usage implements Service { + + protected final Object usageMutex = new Object(); + protected int percentUsage; + private int percentUsageMinDelta = 1; + private String name; + private AtomicBoolean started = new AtomicBoolean(); + private long usage; + private long limit; + + public Usage() { + this("default"); + } + + public Usage(String name) { + this.name = name; + } + + /** + * @throws InterruptedException + */ + public void waitForSpace() throws InterruptedException { + synchronized (usageMutex) { + for (int i = 0; percentUsage >= 100; i++) { + usageMutex.wait(); + } + } + } + + /** + * @param timeout + * @throws InterruptedException + * @return true if space + */ + public boolean waitForSpace(long timeout) throws InterruptedException { + synchronized (usageMutex) { + if (percentUsage >= 100) { + usageMutex.wait(timeout); + } + return percentUsage < 100; + } + } + + public boolean isFull() { + synchronized (usageMutex) { + return percentUsage >= 100; + } + } + + /** + * Tries to increase the usage by value amount but blocks if this object is + * currently full. + * + * @param value + * @throws InterruptedException + */ + public void enqueueUsage(long value) throws InterruptedException { + waitForSpace(); + increaseUsage(value); + } + + /** + * Increases the usage by the value amount. + * + * @param value + */ + public void increaseUsage(long value) { + if (value == 0) { + return; + } + int percentUsage; + synchronized (usageMutex) { + usage += value; + percentUsage = caclPercentUsage(); + } + setPercentUsage(percentUsage); + } + + /** + * Decreases the usage by the value amount. + * + * @param value + */ + public void decreaseUsage(long value) { + if (value == 0) { + return; + } + int percentUsage; + synchronized (usageMutex) { + usage -= value; + percentUsage = caclPercentUsage(); + } + setPercentUsage(percentUsage); + } + + protected long retrieveUsage() { + return usage; + } + + public long getUsage() { + synchronized (usageMutex) { + return usage; + } + } + + public void setUsage(long usage) { + synchronized (usageMutex) { + this.usage = usage; + } + } + + public long getLimit() { + synchronized (usageMutex) { + return limit; + } + } + + /** + * Sets the memory limit in bytes. Setting the limit in bytes will set the + * usagePortion to 0 since the UsageManager is not going to be portion based + * off the parent. When set using XBean, you can use values such as: "20 + * mb", "1024 kb", or "1 gb" + * + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" + */ + public void setLimit(long limit) { + if (percentUsageMinDelta < 0) { + throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0"); + } + synchronized (usageMutex) { + this.limit = limit; + } + onLimitChange(); + } + + protected void onLimitChange() { + // Reset the percent currently being used. + int percentUsage; + synchronized (usageMutex) { + percentUsage = caclPercentUsage(); + } + setPercentUsage(percentUsage); + } + + /* + * Sets the minimum number of percentage points the usage has to change + * before a UsageListener event is fired by the manager. + */ + public int getPercentUsage() { + synchronized (usageMutex) { + return percentUsage; + } + } + + public int getPercentUsageMinDelta() { + synchronized (usageMutex) { + return percentUsageMinDelta; + } + } + + /** + * Sets the minimum number of percentage points the usage has to change + * before a UsageListener event is fired by the manager. + * + * @param percentUsageMinDelta + */ + public void setPercentUsageMinDelta(int percentUsageMinDelta) { + if (percentUsageMinDelta < 1) { + throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0"); + } + int percentUsage; + synchronized (usageMutex) { + this.percentUsageMinDelta = percentUsageMinDelta; + percentUsage = caclPercentUsage(); + } + setPercentUsage(percentUsage); + } + + protected void setPercentUsage(int value) { + synchronized (usageMutex) { + int oldValue = percentUsage; + percentUsage = value; + if (oldValue != value) { + fireEvent(oldValue, value); + } + } + } + + protected int caclPercentUsage() { + if (limit == 0) { + return 0; + } + return (int) ((((retrieveUsage() * 100) / limit) / percentUsageMinDelta) * percentUsageMinDelta); + } + + private void fireEvent(final int oldPercentUsage, final int newPercentUsage) { + if (started.get()) { + // Switching from being full to not being full.. + if (oldPercentUsage >= 100 && newPercentUsage < 100) { + synchronized (usageMutex) { + usageMutex.notifyAll(); + // for (Iterator iter = new + // ArrayList(callbacks).iterator(); + // iter.hasNext();) { + // Runnable callback = iter.next(); + // getExecutor().execute(callback); + // } + // callbacks.clear(); + } + } + // Let the listeners know on a separate thread + // Runnable listenerNotifier = new Runnable() { + // + // public void run() { + // for (Iterator iter = listeners.iterator(); + // iter.hasNext();) { + // UsageListener l = iter.next(); + // l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage); + // } + // } + // + // }; + // getExecutor().execute(listenerNotifier); + } + } + + public String getName() { + return name; + } + + public String toString() { + return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + " limit=" + limit + " percentUsageMinDelta=" + percentUsageMinDelta + "%"; + } + + public void start() { + started.set(true); + } + + public void stop() { + if (started.compareAndSet(true, false)) { + synchronized (usageMutex) { + usageMutex.notifyAll(); + } + } + } + + public void setName(String name) { + this.name = name; + } + +}