activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r633639 [2/7] - in /activemq/sandbox/activemq-router: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/activemq/ src/main/java/org/apache/activemq/broker/ src/main/java/org/apache/active...
Date Tue, 04 Mar 2008 21:01:57 GMT
Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/MultiDestinationClientSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/MultiDestinationClientSubscription.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/MultiDestinationClientSubscription.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/MultiDestinationClientSubscription.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.core;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.router.api.ClientConnection;
+import org.apache.activemq.broker.router.api.ClientSubscription;
+import org.apache.activemq.broker.router.api.Destination;
+import org.apache.activemq.broker.router.api.RequestContext;
+import org.apache.activemq.broker.router.api.StoreSubscription;
+import org.apache.activemq.broker.router.store.api.CacheEntry;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.DestinationFilter;
+import org.apache.activemq.filter.LogicExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NoLocalExpression;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This client subscription can subscribe to multiple destinations.
+ * 
+ * @author chirino
+ */
+abstract public class MultiDestinationClientSubscription implements ClientSubscription {
+
+    static final private Log LOG = LogFactory.getLog(MultiDestinationClientSubscription.class);
+
+    protected final Router router;
+    protected final ConsumerInfo info;
+    protected final BooleanExpression selector;
+    protected final DestinationFilter destinationFilter;
+    protected final int maxPrefetchSize;
+
+    protected ClientConnection clientConnection;
+    protected boolean started;
+
+    // Useful counters that are atomic so that folks can view them
+    // without locking the mutex.
+    protected AtomicLong enqueueCounter = new AtomicLong(0);
+    protected AtomicLong transmitCounter = new AtomicLong(0);
+    protected AtomicLong dequeueCounter = new AtomicLong(0);
+
+    // We access stores via copy on write since writes are infrequent but need
+    // to do async reads of it.
+    protected ArrayList<StoreSubscription> sleepingStores = new ArrayList<StoreSubscription>();
+    protected ArrayList<StoreSubscription> stores = new ArrayList<StoreSubscription>();
+
+    protected final Object prefetchWindowMutex = new Object() {
+    };
+    // If the client requested that the prefetch window get extended.
+    protected int clientExtension;
+    // The number of un-acked messages in transit to the client.
+    protected int clientPrefetchSize;
+
+    public MultiDestinationClientSubscription(Router router, ConsumerInfo info) throws Exception {
+        this.router = router;
+        this.info = info;
+        this.selector = createSelector(info);
+        this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());
+        this.maxPrefetchSize = info.getPrefetchSize();
+    }
+
+    public ConsumerInfo getInfo() {
+        return info;
+    }
+
+    public void addStoreSubscription(StoreSubscription destination) {
+        synchronized (prefetchWindowMutex) {
+            // We access stores via copy on write since writes are infrequent
+            // but need to do async reads of it.
+            stores = new ArrayList<StoreSubscription>(stores);
+            stores.add(destination);
+        }
+    }
+
+    public void removeStoreSubscription(StoreSubscription destination) {
+        synchronized (prefetchWindowMutex) {
+            // We access stores via copy on write since writes are infrequent
+            // but need to do async reads of it.
+            stores = new ArrayList<StoreSubscription>(stores);
+            stores.remove(destination);
+        }
+    }
+
+    public void setClientConnection(ClientConnection clientConnection) {
+        this.clientConnection = clientConnection;
+    }
+
+    public boolean matches(ActiveMQDestination destination) {
+        return destinationFilter.matches(destination);
+    }
+
+    public ActiveMQDestination getDestinationName() {
+        return info.getDestination();
+    }
+
+    public boolean matches(Message node, MessageEvaluationContext context) throws IOException {
+        ConsumerId targetConsumerId = node.getTargetConsumerId();
+        if (targetConsumerId != null) {
+            if (!targetConsumerId.equals(info.getConsumerId())) {
+                return false;
+            }
+        }
+        try {
+            return (selector == null || selector.matches(context)) && this.clientConnection.isAllowedToConsume(node);
+        } catch (JMSException e) {
+            return false;
+        }
+    }
+
+    protected static BooleanExpression createSelector(ConsumerInfo info) throws InvalidSelectorException {
+        BooleanExpression rc = null;
+        if (info.getSelector() != null) {
+            rc = new SelectorParser().parse(info.getSelector());
+        }
+        if (info.isNoLocal()) {
+            if (rc == null) {
+                rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
+            } else {
+                rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
+            }
+        }
+        if (info.getAdditionalPredicate() != null) {
+            if (rc == null) {
+                rc = info.getAdditionalPredicate();
+            } else {
+                rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
+            }
+        }
+        return rc;
+    }
+
+    public void start() throws Exception {
+        synchronized (prefetchWindowMutex) {
+            started = true;
+            router.addSubscription(this);
+            if (!stores.isEmpty()) {
+                for (StoreSubscription store : stores) {
+                    store.wakeup();
+                }
+            }
+        }
+    }
+
+    public void stop() throws Exception {
+        synchronized (prefetchWindowMutex) {
+            started = false;
+            router.removeSubscription(this);
+        }
+    }
+
+    public boolean offer(StoreSubscription source, final CacheEntry reference) throws Exception {
+        synchronized (prefetchWindowMutex) {
+            boolean clientPrefetchFulll = clientPrefetchSize >= (maxPrefetchSize + clientExtension);
+            if (clientPrefetchFulll) {
+                sleepingStores.add(source);
+                return false;
+            }
+            clientPrefetchSize++;
+        }
+
+        reference.load();
+        Destination destination = reference.getStore().getDestination();
+        if (destination.lockForDispatch(this, reference)) {
+            enqueueCounter.incrementAndGet();
+            final MessageDispatch md = new MessageDispatch();
+            md.setConsumerId(info.getConsumerId());
+            md.setDestination(info.getDestination());
+            md.setMessage(reference.getMessage());
+            md.setTransmitCallback(new Runnable() {
+                public void run() {
+                    onTransmit(reference);
+                }
+            });
+
+            // We have to synchronize here because onDispatch may want to know
+            // the
+            // exact order that the message was sent in cause the ack handling
+            // code
+            // is dispatch order dependent.
+            dispatch(reference, md);
+        } else {
+            synchronized (prefetchWindowMutex) {
+                clientPrefetchSize--;
+            }
+            reference.unload();
+        }
+        return true;
+    }
+
+    public void offer(StoreSubscription source, List<CacheEntry> list) throws Exception {
+        // for (Iterator<CacheEntry> iterator = list.iterator();
+        // iterator.hasNext();) {
+        // CacheEntry o = iterator.next();
+        // if (offer(source, o)) {
+        // iterator.remove();
+        // } else {
+        // break;
+        // }
+        // }
+        while (!list.isEmpty()) {
+            int estimatedDispatch;
+            synchronized (prefetchWindowMutex) {
+                estimatedDispatch = (clientExtension + maxPrefetchSize) - clientPrefetchSize;
+                if (estimatedDispatch == 0) {
+                    sleepingStores.add(source);
+                    return;
+                }
+                estimatedDispatch = Math.min(estimatedDispatch, list.size());
+                clientPrefetchSize += estimatedDispatch;
+            }
+
+            int dispatched = 0;
+            for (Iterator<CacheEntry> iterator = list.iterator(); iterator.hasNext();) {
+                final CacheEntry reference = iterator.next();
+                if (dispatched >= estimatedDispatch) {
+                    break;
+                }
+
+                Destination destination = reference.getStore().getDestination();
+                if (destination.lockForDispatch(this, reference)) {
+
+                    assert reference.getMessage() != null : "The message should never be null if a lock is successful.";
+
+                    enqueueCounter.incrementAndGet();
+                    final MessageDispatch md = new MessageDispatch();
+                    md.setConsumerId(info.getConsumerId());
+                    md.setDestination(info.getDestination());
+                    md.setMessage(reference.getMessage());
+                    md.setTransmitCallback(new Runnable() {
+                        public void run() {
+                            onTransmit(reference);
+                        }
+                    });
+
+                    // We have to synchronize here because onDispatch may want
+                    // to know the
+                    // exact order that the message was sent in cause the ack
+                    // handling code
+                    // is dispatch order dependent.
+                    dispatch(reference, md);
+                    dispatched++;
+                } else {
+                    reference.unload();
+                }
+                iterator.remove();
+            }
+            int diff = (estimatedDispatch - dispatched);
+            if (diff > 0) {
+                synchronized (prefetchWindowMutex) {
+                    clientPrefetchSize -= diff;
+                    if (dispatched == 0) {
+                        sleepingStores.add(source);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @param ref
+     * @param md
+     */
+    protected void dispatch(final CacheEntry ref, final MessageDispatch md) {
+        clientConnection.getTransmissionQueue().enqueue(md);
+    }
+
+    /**
+     * Run when the message is actually sent to the client.
+     * 
+     * @param ref
+     */
+    protected void onTransmit(final CacheEntry ref) {
+        // Let the message get unloaded from memory. Not
+        // needed anymore.
+        transmitCounter.incrementAndGet();
+        ref.unload();
+    }
+
+    public void acknowledge(RequestContext context, MessageAck ack) throws Exception {
+        if (ack.isStandardAck()) {
+            standardAck(context, ack);
+        } else if (ack.isDeliveredAck()) {
+            deliveredAck(ack);
+        } else if (ack.isRedeliveredAck()) {
+            redeliveredAck(ack);
+        } else if (ack.isPoisonAck()) {
+            poisonAck(ack);
+        }
+        wakeup();
+    }
+
+    public void wakeup() {
+        ArrayList<StoreSubscription> stores = null;
+        synchronized (prefetchWindowMutex) {
+            int capacity = (clientExtension + maxPrefetchSize) - clientPrefetchSize;
+            if (started && capacity > (maxPrefetchSize / 2) && !this.sleepingStores.isEmpty()) {
+                stores = this.sleepingStores;
+                this.sleepingStores = new ArrayList<StoreSubscription>(stores.size());
+            }
+        }
+        if (stores != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Waking up cause we have enough capacitly to load a big batch: " + this.getInfo().getConsumerId() + ", stores avail: " + stores.size());
+            }
+            for (StoreSubscription store : stores) {
+                store.wakeup();
+            }
+        }
+    }
+
+    abstract protected void poisonAck(MessageAck ack) throws Exception;
+
+    abstract protected void redeliveredAck(MessageAck ack) throws Exception;
+
+    abstract protected void deliveredAck(MessageAck ack) throws Exception;
+
+    abstract protected void standardAck(RequestContext context, MessageAck ack) throws Exception;
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/QualityOfService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/QualityOfService.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/QualityOfService.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/QualityOfService.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.core;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.broker.router.api.ClientSubscription;
+import org.apache.activemq.broker.router.api.RequestContext;
+import org.apache.activemq.broker.router.store.api.CacheEntry;
+import org.apache.activemq.broker.router.store.api.DataStore;
+import org.apache.activemq.broker.router.store.api.ReferenceStore;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.MessageEvaluationContext;
+
+/**
+ * 
+ * @author chirino
+ */
+public class QualityOfService implements Service {
+
+    public static final String TRANSIENT_PREFIX = "transient:";
+    private LinkedList<BroadcastStoreSubscription> subscriptions = new LinkedList<BroadcastStoreSubscription>();
+    private DataStore dataStore;
+    public long lastId = 0;
+    public CacheEntry lastAddedMessage;
+
+    public void enqueue(RequestContext ctx, Message message, Runnable onStored) throws Exception {
+
+        CacheEntry reference = null;
+        LinkedList<BroadcastStoreSubscription> subs;
+        try {
+            synchronized (this) {
+                lastId++;
+                lastAddedMessage = reference = dataStore.addMessage(lastId, message, onStored);
+                subs = subscriptions;
+            }
+
+            // Wake up the subscriptions so that they push messages out to the
+            // clients.
+            MessageEvaluationContext ec = new MessageEvaluationContext();
+            ec.setDestination(dataStore.getDestination().getName());
+            ec.setMessageReference(message);
+
+            for (BroadcastStoreSubscription subscription : subs) {
+                // Lock the subscription state so that we can properly either
+                // dispatch the message or put it on the pending message
+                // store.
+                if (subscription.matches(message, ec)) {
+                    subscription.offer(reference);
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        } finally {
+            if (reference != null) {
+                reference.unload();
+            }
+        }
+
+    }
+
+    public void addSubscription(ClientSubscription subscription) throws Exception {
+        synchronized (this) {
+            subscriptions = new LinkedList<BroadcastStoreSubscription>(subscriptions);
+            ReferenceStore store = dataStore.addStore(TRANSIENT_PREFIX + subscription.getInfo().getConsumerId());
+            BroadcastStoreSubscription ss = new BroadcastStoreSubscription(dataStore, store, subscription);
+            if (lastAddedMessage != null) {
+                ss.recoverUntil(lastAddedMessage);
+            }
+            subscriptions.add(ss);
+            subscription.addStoreSubscription(ss);
+            ss.start();
+            subscription.wakeup();
+        }
+    }
+
+    public void removeSubscription(ClientSubscription subscription) throws Exception {
+        synchronized (this) {
+            subscriptions = new LinkedList<BroadcastStoreSubscription>(subscriptions);
+            for (Iterator<BroadcastStoreSubscription> iterator = subscriptions.iterator(); iterator.hasNext();) {
+                BroadcastStoreSubscription ss = iterator.next();
+                if (ss.getClientSubscription() == subscription) {
+                    ss.getDataStore().removeStore(ss.getStore());
+                    iterator.remove();
+                    break;
+                }
+            }
+        }
+    }
+
+    public DataStore getDataStore() {
+        return dataStore;
+    }
+
+    public void setDataStore(DataStore dataStore) throws Exception {
+        this.dataStore = dataStore;
+    }
+
+    public void start() throws Exception {
+
+        if (dataStore == null) {
+            throw new IllegalArgumentException("The dataStore property must be set before being started.");
+        }
+
+        lastAddedMessage = dataStore.getLastAddedEntry();
+        if (lastAddedMessage != null) {
+            lastId = lastAddedMessage.getId();
+        }
+
+        // Delete any transient stores that be have been left around since the
+        // last restart.
+        List<ReferenceStore> stores = dataStore.getStores();
+        for (ReferenceStore s : stores) {
+            if (s.getName().startsWith(TRANSIENT_PREFIX)) {
+                dataStore.removeStore(s);
+            }
+        }
+    }
+
+    public void stop() throws Exception {
+        List<ReferenceStore> stores = dataStore.getStores();
+        for (ReferenceStore s : stores) {
+            if (s.getName().startsWith(TRANSIENT_PREFIX)) {
+                dataStore.removeStore(s);
+            }
+        }
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/Router.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/Router.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/Router.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.core;
+
+import java.util.HashMap;
+import java.util.Set;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.router.api.ClientSubscription;
+import org.apache.activemq.broker.router.api.Destination;
+import org.apache.activemq.broker.router.api.DestinationManager;
+import org.apache.activemq.broker.router.core.SimpleDestinationManager.DestinationManagerCallback;
+import org.apache.activemq.broker.router.core.queue.BroadcastQueue;
+import org.apache.activemq.broker.router.core.queue.QueueSubscription;
+import org.apache.activemq.broker.router.core.topic.Topic;
+import org.apache.activemq.broker.router.core.topic.TopicSubscription;
+import org.apache.activemq.broker.router.store.api.DataStoreManager;
+import org.apache.activemq.broker.router.util.Selector;
+import org.apache.activemq.broker.router.util.SelectorThreadPool;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+
+/**
+ * 
+ * @author chirino
+ */
+public class Router implements DestinationManagerCallback {
+
+    private final HashMap<ConsumerId, ClientSubscription> subscriptions = new HashMap<ConsumerId, ClientSubscription>();
+    // Used to monitor subscriptions that need someone to fill their
+    // server side prefetch buffers.
+    private final Selector<Runnable> prefetchSelector = new Selector<Runnable>();
+
+    private DestinationManager destinationManager;
+    private SelectorThreadPool threadPool;
+    private String brokerName;
+    private DataStoreManager persistentDataStoreManager;
+    private DataStoreManager transientDataStoreManager;
+
+    // ////////////////////////////////////////////////////////////////////////
+    // Lifecycle
+    // ////////////////////////////////////////////////////////////////////////
+
+    public void start() throws Exception {
+        if (persistentDataStoreManager == null) {
+            throw new IllegalArgumentException("persistentDataStoreManager must be set");
+        }
+        if (transientDataStoreManager == null) {
+            throw new IllegalArgumentException("transientDataStoreManager must be set");
+        }
+        if (threadPool == null) {
+            throw new IllegalArgumentException("threadPool must be set");
+        }
+        if (destinationManager == null) {
+            throw new IllegalArgumentException("destinationManager must be set");
+        }
+        if (brokerName == null) {
+            throw new IllegalArgumentException("brokerName must be set");
+        }
+        persistentDataStoreManager.start();
+        transientDataStoreManager.start();
+        destinationManager.setDestinationManagerCallback(this);
+        threadPool.add(prefetchSelector);
+        threadPool.start();
+    }
+
+    public void stop() throws Exception {
+        transientDataStoreManager.stop();
+        persistentDataStoreManager.stop();
+        threadPool.stop();
+    }
+
+    // ////////////////////////////////////////////////////////////////////////
+    // DestinationManagerCallback Methods
+    // ////////////////////////////////////////////////////////////////////////
+
+    public BroadcastQueue createQueue(ActiveMQQueue name) {
+        return new BroadcastQueue(this, name);
+    }
+
+    public Topic createTopic(ActiveMQTopic name) {
+        return new Topic(this, name);
+    }
+
+    /**
+     * Matches up existing subscriptions with this destination.
+     * 
+     * @param destination
+     * @throws Exception
+     * @throws JMSException
+     */
+    public void addDestination(Destination destination) throws Exception {
+        synchronized (destinationManager.getMutex()) {
+            // TODO: catch the exception and undo the subscription..
+            // Add all consumers that are interested in the destination.
+            for (ClientSubscription sub : subscriptions.values()) {
+                if (sub.matches(destination.getName())) {
+                    destination.addSubscription(sub);
+                }
+            }
+        }
+    }
+
+    public void removeDestination(Destination destination, boolean force) throws Exception {
+        // Safety check..
+        synchronized (destinationManager.getMutex()) {
+            if (!force) {
+                int counter = 0;
+                for (ClientSubscription sub : subscriptions.values()) {
+                    if (sub.matches(destination.getName())) {
+                        counter++;
+                    }
+                }
+                if (counter != 0) {
+                    throw new JMSException("Destination '" + destination + "' still has an active subscriptions: " + counter);
+                }
+            }
+
+            for (ClientSubscription sub : subscriptions.values()) {
+                if (sub.matches(destination.getName())) {
+                    destination.removeSubscription(sub);
+                }
+            }
+        }
+    }
+
+    // ////////////////////////////////////////////////////////////////////////
+    // Subscription Management
+    // ////////////////////////////////////////////////////////////////////////
+
+    /**
+     * Used by the ClientConnection
+     */
+    public ClientSubscription createSubscription(ConsumerInfo consumerInfo) throws Exception {
+        ClientSubscription rc = null;
+        ActiveMQDestination name = consumerInfo.getDestination();
+        if (name.isQueue()) {
+            rc = new QueueSubscription(this, consumerInfo);
+        } else {
+            rc = new TopicSubscription(this, consumerInfo);
+        }
+        return rc;
+    }
+
+    /**
+     * Used in the Subscription.start()
+     * 
+     * @throws Exception
+     */
+    public void addSubscription(ClientSubscription subscription) throws Exception {
+        synchronized (destinationManager.getMutex()) {
+            // TODO: catch the exception and undo the subscription..
+            subscriptions.put(subscription.getInfo().getConsumerId(), subscription);
+            ActiveMQDestination name = subscription.getDestinationName();
+            Set<Destination> destinations = destinationManager.getDestinations(name);
+            for (Destination destination : destinations) {
+                destination.addSubscription(subscription);
+            }
+        }
+    }
+
+    /**
+     * Used in the Subscription.stop()
+     * 
+     * @throws Exception
+     */
+    public void removeSubscription(ClientSubscription subscription) throws Exception {
+        synchronized (destinationManager.getMutex()) {
+            ActiveMQDestination name = subscription.getDestinationName();
+            Set<Destination> destinations = destinationManager.getDestinations(name);
+            for (Destination destination : destinations) {
+                destination.removeSubscription(subscription);
+            }
+            subscriptions.remove(subscription.getInfo().getConsumerId());
+        }
+    }
+
+    // ////////////////////////////////////////////////////////////////////////
+    // Accessors and Mutators
+    // ////////////////////////////////////////////////////////////////////////
+
+    public Selector<Runnable> getDestinationPrefetchSelector() {
+        return prefetchSelector;
+    }
+
+    public DestinationManager getDestinationManager() {
+        return destinationManager;
+    }
+
+    /**
+     * Flushes all possible caches associated with this broker.
+     */
+    public void fushCaches() {
+    }
+
+    public ClientSubscription getSubscription(ConsumerId consumerId) {
+        return null;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public DataStoreManager getPersistentDataStoreManager() {
+        return persistentDataStoreManager;
+    }
+
+    public DataStoreManager getTransientDataStoreManager() {
+        return transientDataStoreManager;
+    }
+
+    public void setTransientDataStoreManager(DataStoreManager transientDataStoreManager) {
+        this.transientDataStoreManager = transientDataStoreManager;
+    }
+
+    public void setPersistentDataStoreManager(DataStoreManager persistentDataStoreManager) {
+        this.persistentDataStoreManager = persistentDataStoreManager;
+    }
+
+    public SelectorThreadPool getThreadPool() {
+        return threadPool;
+    }
+
+    public void setThreadPool(SelectorThreadPool threadPool) {
+        this.threadPool = threadPool;
+    }
+
+    public void setDestinationManager(DestinationManager destinationManager) {
+        this.destinationManager = destinationManager;
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/RouterFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/RouterFactory.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/RouterFactory.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/RouterFactory.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.core;
+
+import org.apache.activemq.broker.router.api.DestinationManager;
+import org.apache.activemq.broker.router.store.api.DataStoreManager;
+import org.apache.activemq.broker.router.store.journal.JournalDataStoreManagerFactory;
+import org.apache.activemq.broker.router.store.memory.MemoryDataStoreManager;
+import org.apache.activemq.broker.router.util.SelectorThreadPool;
+
+/**
+ * 
+ * @author chirino
+ */
+public class RouterFactory {
+
+    protected String brokerName;
+    protected DataStoreManager persistentDataStoreManager;
+    protected DataStoreManager transientDataStoreManager;
+    protected SelectorThreadPool threadPool;
+    protected DestinationManager destinationManager;
+
+    public Router createRouter() throws Exception {
+        Router rc = new Router();
+        rc.setThreadPool(getThreadPool());
+        rc.setDestinationManager(getDestinationManager());
+        rc.setBrokerName(getBrokerName());
+        rc.setPersistentDataStoreManager(getPersistentDataStoreManager());
+        rc.setTransientDataStoreManager(getTransientDataStoreManager());
+        return rc;
+    }
+
+    public String getBrokerName() {
+        if (brokerName == null) {
+            brokerName = "localhost";
+        }
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public DataStoreManager getPersistentDataStoreManager() throws Exception {
+        if (persistentDataStoreManager == null) {
+            JournalDataStoreManagerFactory factory = new JournalDataStoreManagerFactory();
+            persistentDataStoreManager = factory.createJournalDataStoreManager();
+        }
+        return persistentDataStoreManager;
+    }
+
+    public void setPersistentDataStoreManager(DataStoreManager persistentDataStoreManager) {
+        this.persistentDataStoreManager = persistentDataStoreManager;
+    }
+
+    public DataStoreManager getTransientDataStoreManager() {
+        if (transientDataStoreManager == null) {
+            transientDataStoreManager = new MemoryDataStoreManager();
+        }
+        return transientDataStoreManager;
+    }
+
+    public void setTransientDataStoreManager(DataStoreManager transientDataStoreManager) {
+        this.transientDataStoreManager = transientDataStoreManager;
+    }
+
+    public SelectorThreadPool getThreadPool() {
+        if (threadPool == null) {
+            threadPool = new SelectorThreadPool("ActiveMQ " + brokerName, 10, 50);
+        }
+        return threadPool;
+    }
+
+    public void setThreadPool(SelectorThreadPool threadPool) {
+        this.threadPool = threadPool;
+    }
+
+    public DestinationManager getDestinationManager() {
+        if (destinationManager == null) {
+            destinationManager = new SimpleDestinationManager();
+        }
+        return destinationManager;
+    }
+
+    public void setDestinationManager(DestinationManager destinationManager) {
+        this.destinationManager = destinationManager;
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/SimpleDestinationManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/SimpleDestinationManager.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/SimpleDestinationManager.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/SimpleDestinationManager.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.core;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.router.api.Destination;
+import org.apache.activemq.broker.router.api.DestinationManager;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+
+/**
+ * 
+ * @author chirino
+ */
+public class SimpleDestinationManager implements DestinationManager {
+
+    private final HashMap<ActiveMQDestination, Destination> destinations = new HashMap<ActiveMQDestination, Destination>();
+    private boolean autoCreate = true;
+    private final Object mutex = new Object();
+    private DestinationManagerCallback destinationManagerCallback;
+
+    public interface DestinationManagerCallback {
+        Destination createQueue(ActiveMQQueue name);
+
+        Destination createTopic(ActiveMQTopic name);
+
+        void addDestination(Destination destination) throws Exception;
+
+        void removeDestination(Destination destination, boolean force) throws Exception;
+    }
+
+    public SimpleDestinationManager() {
+    }
+
+    /**
+     * @see org.apache.activemq.broker.router.perf.core.DestinationManager#getDestinations(org.apache.activemq.command.ActiveMQDestination)
+     */
+    public Set<Destination> getDestinations(ActiveMQDestination name) throws Exception {
+        if (name.isPattern()) {
+            throw new IllegalArgumentException("This destination manager does not support wild card names.");
+        }
+
+        if (name.isComposite()) {
+            HashSet<Destination> rc = new HashSet<Destination>();
+            ActiveMQDestination[] compositeDestinations = name.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                Set<Destination> a = getDestinations(compositeDestinations[i]);
+                rc.addAll(a);
+            }
+            return rc;
+        }
+
+        synchronized (mutex) {
+            Destination destination = destinations.get(name);
+            if (destination == null) {
+                if (autoCreate) {
+                    destination = createDestination(name);
+                } else {
+                    throw new JMSException("Could not find destination: " + name);
+                }
+            }
+            return Collections.singleton(destination);
+        }
+
+    }
+
+    /**
+     * @see org.apache.activemq.broker.router.perf.core.DestinationManager#createDestination(org.apache.activemq.command.ActiveMQDestination)
+     */
+    public Destination createDestination(ActiveMQDestination name) throws Exception {
+        Destination destination;
+
+        byte type = name.getDestinationType();
+        switch (type) {
+        case ActiveMQDestination.QUEUE_TYPE:
+            destination = destinationManagerCallback.createQueue((ActiveMQQueue) name);
+            break;
+        case ActiveMQDestination.TOPIC_TYPE:
+            destination = destinationManagerCallback.createTopic((ActiveMQTopic) name);
+            break;
+        default:
+            throw new IllegalStateException("Invalid destination type: " + type);
+        }
+
+        destination.start();
+
+        synchronized (mutex) {
+            destinations.put(name, destination);
+            destinationManagerCallback.addDestination(destination);
+        }
+        return destination;
+    }
+
+    /**
+     * @see org.apache.activemq.broker.router.perf.core.DestinationManager#destroyDestination(org.apache.activemq.command.ActiveMQDestination,
+     *      boolean)
+     */
+    public void destroyDestination(ActiveMQDestination name, boolean force) throws Exception {
+        Set<Destination> rc = getDestinations(name);
+        for (Destination d : rc) {
+            d.stop();
+            destinationManagerCallback.removeDestination(d, force);
+            destinations.remove(d.getName());
+        }
+    }
+
+    /**
+     * @see org.apache.activemq.broker.router.perf.core.DestinationManager#isAutoCreate()
+     */
+    public boolean isAutoCreate() {
+        return autoCreate;
+    }
+
+    /**
+     * @see org.apache.activemq.broker.router.perf.core.DestinationManager#setAutoCreate(boolean)
+     */
+    public void setAutoCreate(boolean autoCreate) {
+        this.autoCreate = autoCreate;
+    }
+
+    /**
+     * @see org.apache.activemq.broker.router.perf.core.DestinationManager#getMutex()
+     */
+    public Object getMutex() {
+        return mutex;
+    }
+
+    public DestinationManagerCallback getDestinationManagerCallback() {
+        return destinationManagerCallback;
+    }
+
+    public void setDestinationManagerCallback(DestinationManagerCallback destinationManagerCallback) {
+        this.destinationManagerCallback = destinationManagerCallback;
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/TransportClientConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/TransportClientConnection.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/TransportClientConnection.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/TransportClientConnection.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.core;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.broker.router.api.ClientConnection;
+import org.apache.activemq.broker.router.api.ClientSubscription;
+import org.apache.activemq.broker.router.api.Destination;
+import org.apache.activemq.broker.router.api.RequestContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+
+/**
+ * 
+ * @author chirino
+ */
+public class TransportClientConnection implements ClientConnection {
+
+    private final Transport transport;
+    private final Router router;
+
+    public final class TansportTransmitQueue implements TransmitQueue {
+        List<Command> queue = new LinkedList<Command>();
+
+        synchronized private Command dequeue(long timeout) throws InterruptedException {
+            if (queue.isEmpty()) {
+                wait(timeout);
+                if (queue.isEmpty()) {
+                    return null;
+                }
+            }
+            Command rc = queue.remove(0);
+            return rc;
+        }
+
+        synchronized public void enqueue(Command command) {
+            queue.add(command);
+            notify();
+        }
+
+        synchronized public void enqueueFirst(Command command) {
+            queue.add(0, command);
+            notify();
+        }
+    }
+
+    private final TansportTransmitQueue transmissionQueue = new TansportTransmitQueue();
+
+    // Keeps track of client state.
+    private final Map<ConsumerId, ClientSubscription> subscriptions = Collections.synchronizedMap(new HashMap<ConsumerId, ClientSubscription>());
+
+    public TransportClientConnection(Router router, Transport transport) {
+        this.router = router;
+        this.transport = transport;
+
+        this.transport.setTransportListener(new TransportListener() {
+            public void onCommand(Object command) {
+                TransportClientConnection.this.onCommand((Command) command);
+            }
+
+            public void onException(IOException error) {
+                onTransportError(error);
+            }
+
+            public void transportInterupted() {
+            }
+
+            public void transportResumed() {
+            }
+        });
+
+    }
+
+    public void start() throws Exception {
+        transport.start();
+        Thread thread = new Thread("ActiveMQ Transmit To: " + transport.getRemoteAddress()) {
+            @Override
+            public void run() {
+                try {
+                    Command command = transmissionQueue.dequeue(100);
+                    while (command != null) {
+                        transport.oneway(command);
+                        if (command.getDataStructureType() == MessageDispatch.DATA_STRUCTURE_TYPE) {
+                            ((MessageDispatch) command).getTransmitCallback().run();
+                        }
+                        command = transmissionQueue.dequeue(100);
+                    }
+                } catch (Throwable e) {
+                    onTransmitError(e);
+                }
+            }
+        };
+        thread.setPriority(9);
+        thread.start();
+    }
+
+    public void stop() throws Exception {
+        transport.stop();
+    }
+
+    public void onCommand(Command command) {
+
+        RequestContext requestContext = new RequestContext();
+        requestContext.router = router;
+        requestContext.clientConnection = this;
+        requestContext.command = command;
+
+        Response response = null;
+        int commandId = command.getCommandId();
+        try {
+            switch (command.getDataStructureType()) {
+            case ActiveMQMessage.DATA_STRUCTURE_TYPE:
+            case ActiveMQTextMessage.DATA_STRUCTURE_TYPE:
+            case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE:
+            case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE:
+            case ActiveMQMapMessage.DATA_STRUCTURE_TYPE:
+                onMessage(requestContext, (Message) command);
+                break;
+            case ConsumerInfo.DATA_STRUCTURE_TYPE:
+                onConsumerInfo(requestContext, (ConsumerInfo) command);
+                break;
+            case MessageAck.DATA_STRUCTURE_TYPE:
+                onMessageAck(requestContext, (MessageAck) command);
+                break;
+            default:
+                throw new RuntimeException("Not yet implemented.");
+            }
+        } catch (Throwable e) {
+            if (command.isResponseRequired()) {
+                response = new ExceptionResponse(e);
+            }
+            onCommandError(e);
+        }
+
+        if (response != null && requestContext.autoRespond) {
+            response.setCorrelationId(commandId);
+            getTransmissionQueue().enqueueFirst(response);
+        }
+    }
+
+    public void onConsumerInfo(RequestContext requestContext, ConsumerInfo info) throws Exception {
+        if (info.isDurable()) {
+            throw new RuntimeException("Not yet implemented.");
+        } else {
+            final ClientSubscription subscription = router.createSubscription(info);
+            subscription.setClientConnection(this);
+            subscriptions.put(info.getConsumerId(), subscription);
+            subscription.start();
+        }
+    }
+
+    public void onMessage(RequestContext requestContext, final Message message) throws Exception {
+
+        ActiveMQDestination name = message.getDestination();
+        final Set<Destination> destinations = router.getDestinationManager().getDestinations(name);
+
+        // Don't auto response.. we will send a manual response once the message
+        // is securely on disk.
+        requestContext.autoRespond = false;
+        Runnable completionHandler = null;
+        if (message.isResponseRequired()) {
+            completionHandler = new Runnable() {
+                AtomicInteger completeCounter = new AtomicInteger(destinations.size());
+
+                public void run() {
+                    if (completeCounter.decrementAndGet() == 0) {
+                        Response response = new Response();
+                        response.setCorrelationId(message.getCommandId());
+                        getTransmissionQueue().enqueueFirst(response);
+                    }
+                }
+            };
+        }
+
+        for (Destination destination : destinations) {
+            destination.enqueue(requestContext, message, completionHandler);
+        }
+    }
+
+    /**
+     * acknowledge a message.
+     * 
+     * @param requestContext
+     * 
+     * @param command
+     * @throws Exception
+     */
+    private void onMessageAck(RequestContext requestContext, MessageAck ack) throws Exception {
+        ClientSubscription subscription = subscriptions.get(ack.getConsumerId());
+        subscription.acknowledge(requestContext, ack);
+    }
+
+    /**
+     * Error occurred while processing a client command. Called from the
+     * transport thread.
+     * 
+     * @param e
+     */
+    private void onCommandError(Throwable e) {
+    }
+
+    /**
+     * Error occurred at the transport layer.. Called from the transport thread.
+     * 
+     * @param error
+     */
+    protected void onTransportError(IOException error) {
+    }
+
+    /**
+     * Error occurred while polling for data to send the client. Called from the
+     * context of the transmit thread.
+     * 
+     * @param error
+     */
+    protected void onTransmitError(Throwable e) {
+    }
+
+    public TransmitQueue getTransmissionQueue() {
+        return transmissionQueue;
+    }
+
+    /**
+     * Called by a Subscription
+     * 
+     * @param node
+     * @return
+     */
+    public boolean isAllowedToConsume(Message node) {
+        return false;
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/package.html?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/package.html (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/package.html Tue Mar  4 13:01:41 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+The implementations of the org.apache.activemq.broker.router.api package.
+
+</body>
+</html>

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BroadcastQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BroadcastQueue.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BroadcastQueue.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BroadcastQueue.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.core.queue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.broker.router.api.ClientSubscription;
+import org.apache.activemq.broker.router.api.RequestContext;
+import org.apache.activemq.broker.router.core.BroadcastDestination;
+import org.apache.activemq.broker.router.core.QualityOfService;
+import org.apache.activemq.broker.router.core.Router;
+import org.apache.activemq.broker.router.store.api.CacheEntry;
+import org.apache.activemq.broker.router.store.api.DataStore;
+import org.apache.activemq.broker.router.store.api.DataStoreManager;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+
+/**
+ * 
+ * @author chirino
+ */
+public class BroadcastQueue extends BroadcastDestination {
+
+    // It should be easy to support message priorities by
+    // creating a QoS for each priority level.
+    private final ArrayList<QualityOfService> allQos = new ArrayList<QualityOfService>();
+    private final QualityOfService transientQos = new QualityOfService();
+    private final QualityOfService persistentQos = new QualityOfService();
+
+    public BroadcastQueue(Router router, ActiveMQDestination name) {
+        super(router, name);
+        allQos.add(persistentQos);
+        allQos.add(transientQos);
+    }
+
+    public void dequeue(RequestContext context, MessageAck ack, DataStore dataStore, long storeId) throws Exception {
+        dataStore.remove(storeId, null);
+    }
+
+    public boolean lockForDispatch(ClientSubscription source, CacheEntry ref) {
+        return ref.lock();
+    }
+
+    protected QualityOfService chooseQosFor(Message message) {
+        if (message.isPersistent()) {
+            return persistentQos;
+        } else {
+            return transientQos;
+        }
+    }
+
+    protected List<QualityOfService> getAllQos() {
+        return allQos;
+    }
+
+    @Override
+    public void start() throws Exception {
+
+        DataStoreManager dsm = router.getPersistentDataStoreManager();
+        DataStore ds = dsm.addStore(getName().getQualifiedName());
+        ds.setDestination(this);
+        persistentQos.setDataStore(ds);
+
+        dsm = router.getTransientDataStoreManager();
+        ds = dsm.addStore(getName().getQualifiedName());
+        ds.setDestination(this);
+        transientQos.setDataStore(ds);
+
+        super.start();
+    }
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/QueueSubscription.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/QueueSubscription.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/QueueSubscription.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.core.queue;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.router.api.RequestContext;
+import org.apache.activemq.broker.router.core.MultiDestinationClientSubscription;
+import org.apache.activemq.broker.router.core.Router;
+import org.apache.activemq.broker.router.store.api.CacheEntry;
+import org.apache.activemq.broker.router.store.api.DataStore;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.transaction.Synchronization;
+
+/**
+ * 
+ * @author chirino
+ */
+public class QueueSubscription extends MultiDestinationClientSubscription {
+
+    // Messages that have been dispatched to the client and are either in
+    // transit to the client in the prefetch or being processed.
+    class PrefetchItem {
+        private final MessageId messageId;
+        private final Long storeId;
+        private final DataStore dataStore;
+
+        public PrefetchItem(DataStore dataStore, MessageId messageId, Long storeId) {
+            this.dataStore = dataStore;
+            this.messageId = messageId;
+            this.storeId = storeId;
+        }
+    }
+
+    private final Object dispatchMutex = new Object() {
+    };
+    private final LinkedList<PrefetchItem> clientPrefetch = new LinkedList<PrefetchItem>();
+
+    public QueueSubscription(Router router, ConsumerInfo info) throws Exception {
+        super(router, info);
+    }
+
+    protected void dispatch(final CacheEntry ref, final MessageDispatch md) {
+        // We sync here because we the client has to receive the messages in the
+        // same order placed on
+        // the clientPrefetch list. This is because ack processing is order
+        // dependent.
+        synchronized (dispatchMutex) {
+            synchronized (clientPrefetch) {
+                clientPrefetch.add(new PrefetchItem(ref.getStore(), md.getMessage().getMessageId(), ref.getId()));
+            }
+            super.dispatch(ref, md);
+        }
+    }
+
+    protected void standardAck(final RequestContext context, final MessageAck ack) throws Exception {
+        boolean inAckRange = false;
+        boolean callDispatchMatched = false;
+        final List<PrefetchItem> ackedMessages = new ArrayList<PrefetchItem>();
+
+        // Acknowledge all dispatched messages up till the message id of
+        // the acknowledgment.
+        synchronized (clientPrefetch) {
+
+            for (Iterator<PrefetchItem> iterator = clientPrefetch.iterator(); iterator.hasNext();) {
+                PrefetchItem ref = iterator.next();
+                MessageId messageId = ref.messageId;
+
+                // Only keep going if within the ack range.
+                if (!inAckRange) {
+                    if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
+                        inAckRange = true;
+                    } else {
+                        continue;
+                    }
+                }
+                if (inAckRange) {
+                    ackedMessages.add(ref);
+                    if (!context.isInTransaction()) {
+                        iterator.remove();
+                    }
+
+                    if (ack.getLastMessageId().equals(messageId)) {
+                        callDispatchMatched = true;
+                        break;
+                    }
+                }
+            }
+        }
+
+        if (context.isInTransaction()) {
+            context.getTransaction().addSynchronization(new Synchronization() {
+                public void afterCommit() throws Exception {
+                    dequeueCounter.addAndGet(ackedMessages.size());
+                    synchronized (prefetchWindowMutex) {
+                        clientExtension -= ackedMessages.size();
+                        if (clientExtension < 0) {
+                            clientExtension = 0;
+                        }
+                        clientPrefetchSize -= ackedMessages.size();
+                    }
+                    for (final PrefetchItem ref : ackedMessages) {
+                        synchronized (clientPrefetch) {
+                            clientPrefetch.remove(ref);
+                        }
+                        ref.dataStore.getDestination().dequeue(context, ack, ref.dataStore, ref.storeId);
+                    }
+                }
+            });
+            synchronized (prefetchWindowMutex) {
+                if (maxPrefetchSize != 0) {
+                    clientExtension += ackedMessages.size();
+                }
+            }
+        } else {
+            dequeueCounter.addAndGet(ackedMessages.size());
+            synchronized (prefetchWindowMutex) {
+                clientExtension -= ackedMessages.size();
+                if (clientExtension < 0) {
+                    clientExtension = 0;
+                }
+                clientPrefetchSize -= ackedMessages.size();
+            }
+            for (final PrefetchItem ref : ackedMessages) {
+                ref.dataStore.getDestination().dequeue(context, ack, ref.dataStore, ref.storeId);
+            }
+        }
+
+        if (!callDispatchMatched) {
+            throw new JMSException("Could not correlate acknowledgment with clientPrefetch message: " + ack);
+        }
+
+    }
+
+    protected void deliveredAck(MessageAck ack) throws JMSException {
+        synchronized (prefetchWindowMutex) {
+            clientExtension += ack.getMessageCount();
+        }
+    }
+
+    protected void poisonAck(MessageAck ack) throws JMSException {
+        synchronized (prefetchWindowMutex) {
+
+            // TODO: what if the message is already in a DLQ???
+            // Handle the poison ACK case: we need to send the message to a
+            // DLQ
+            if (ack.isInTransaction()) {
+                throw new JMSException("Poison ack cannot be transacted: " + ack);
+            }
+            // Acknowledge all clientPrefetch messages up till the message id of
+            // the
+            // acknowledgment.
+            // int index = 0;
+            boolean inAckRange = false;
+            List<Message> removeList = new ArrayList<Message>();
+            for (final PrefetchItem ref : clientPrefetch) {
+                MessageId messageId = ref.messageId;
+                if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
+                    inAckRange = true;
+                }
+                if (inAckRange) {
+                    // sendToDLQ(context, node);
+                    // node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
+                    // removeList.add(node);
+                    // dequeueCounter++;
+                    // index++;
+                    // acknowledge(context, ack, node);
+                    // if (ack.getLastMessageId().equals(messageId)) {
+                    // prefetchExtension = Math.max(0, prefetchExtension -
+                    // (index + 1));
+                    // callDispatchMatched = true;
+                    // break;
+                    // }
+                }
+            }
+            for (final Message node : removeList) {
+                clientPrefetch.remove(node);
+            }
+        }
+    }
+
+    protected void redeliveredAck(MessageAck ack) throws JMSException {
+        synchronized (prefetchWindowMutex) {
+            boolean callDispatchMatched = false;
+            // Message was re-delivered but it was not yet considered to be DLQ
+            // message.
+            // Acknowledge all clientPrefetch messages up till the message id of
+            // the acknowledgment.
+            boolean inAckRange = false;
+            for (final PrefetchItem reference : clientPrefetch) {
+                MessageId messageId = reference.messageId;
+                if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
+                    inAckRange = true;
+                }
+                if (inAckRange) {
+                    // TODO: figure out a way to do redeliver counter
+                    // increments..
+                    // reference.incrementRedeliveryCounter();
+                    if (ack.getLastMessageId().equals(messageId)) {
+                        callDispatchMatched = true;
+                        break;
+                    }
+                }
+            }
+            if (!callDispatchMatched) {
+                throw new JMSException("Could not correlate acknowledgment with clientPrefetch message: " + ack);
+            }
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/Topic.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/Topic.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/Topic.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/Topic.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.core.topic;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.broker.router.api.ClientSubscription;
+import org.apache.activemq.broker.router.api.RequestContext;
+import org.apache.activemq.broker.router.core.BroadcastDestination;
+import org.apache.activemq.broker.router.core.QualityOfService;
+import org.apache.activemq.broker.router.core.Router;
+import org.apache.activemq.broker.router.store.api.CacheEntry;
+import org.apache.activemq.broker.router.store.api.DataStore;
+import org.apache.activemq.broker.router.store.api.DataStoreManager;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+
+/**
+ * 
+ * @author chirino
+ */
+public class Topic extends BroadcastDestination {
+
+    // It should be easy to support message priorities by
+    // creating a QoS for each priority level.
+    private final ArrayList<QualityOfService> allQos = new ArrayList<QualityOfService>();
+    private final QualityOfService transientQos = new QualityOfService();
+    private final QualityOfService persistentQos = new QualityOfService();
+
+    public Topic(Router router, ActiveMQDestination name) {
+        super(router, name);
+        allQos.add(persistentQos);
+        allQos.add(transientQos);
+    }
+
+    public void dequeue(RequestContext context, MessageAck ack, DataStore dataStore, long storeId) throws Exception {
+    }
+
+    public boolean lockForDispatch(ClientSubscription source, CacheEntry ref) {
+        return true;
+    }
+
+    protected QualityOfService chooseQosFor(Message message) {
+        if (message.isPersistent()) {
+            return persistentQos;
+        } else {
+            return transientQos;
+        }
+    }
+
+    protected List<QualityOfService> getAllQos() {
+        return allQos;
+    }
+
+    @Override
+    public void start() throws Exception {
+        DataStoreManager dsm = router.getPersistentDataStoreManager();
+        DataStore ds = dsm.addStore(getName().getQualifiedName());
+        ds.setDestination(this);
+        ds.setAutoRemove(true);
+        persistentQos.setDataStore(ds);
+
+        dsm = router.getTransientDataStoreManager();
+        ds = dsm.addStore(getName().getQualifiedName());
+        ds.setAutoRemove(true);
+        ds.setDestination(this);
+        transientQos.setDataStore(ds);
+
+        super.start();
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/TopicSubscription.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/TopicSubscription.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/TopicSubscription.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.core.topic;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.router.api.RequestContext;
+import org.apache.activemq.broker.router.core.MultiDestinationClientSubscription;
+import org.apache.activemq.broker.router.core.Router;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.transaction.Synchronization;
+
+/**
+ * 
+ * @author chirino
+ */
+public class TopicSubscription extends MultiDestinationClientSubscription {
+
+    public TopicSubscription(Router router, ConsumerInfo info) throws Exception {
+        super(router, info);
+    }
+
+    protected void standardAck(RequestContext context, final MessageAck ack) throws Exception {
+        if (context.isInTransaction()) {
+            clientExtension += ack.getMessageCount();
+            context.getTransaction().addSynchronization(new Synchronization() {
+                public void afterCommit() throws Exception {
+                    dequeueCounter.addAndGet(ack.getMessageCount());
+                    synchronized (prefetchWindowMutex) {
+                        clientExtension -= ack.getMessageCount() * 2;
+                        if (clientExtension < 0) {
+                            clientExtension = 0;
+                        }
+                        clientPrefetchSize -= ack.getMessageCount();
+                    }
+                }
+            });
+        } else {
+            dequeueCounter.addAndGet(ack.getMessageCount());
+            synchronized (prefetchWindowMutex) {
+                clientExtension -= ack.getMessageCount();
+                if (clientExtension < 0) {
+                    clientExtension = 0;
+                }
+                clientPrefetchSize -= ack.getMessageCount();
+            }
+        }
+    }
+
+    protected void deliveredAck(MessageAck ack) throws JMSException {
+        synchronized (prefetchWindowMutex) {
+            clientExtension += ack.getMessageCount();
+        }
+    }
+
+    protected void poisonAck(MessageAck ack) throws JMSException {
+        // Can't do much about it... We don't DLQ stuff sent to topics because 1
+        // poison
+        // message could generate a bunch of poisonAcks and then you would
+        // end up with duplicate messages in a DLQ.
+        throw new JMSException("Poison acks not supported on a topic please check that your client is compaible with this broker.");
+    }
+
+    protected void redeliveredAck(MessageAck ack) throws JMSException {
+        // This also is not supported in the topic case. Just cause a message
+        // has been re-delivered to 1 consumer multiple times does not mean that
+        // message
+        // to other topic consumer need their re-delivery counter incremented.
+        throw new JMSException("Poison acks not supported on a topic please check that your client is compaible with this broker.");
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndex.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndex.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndex.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.index.api;
+
+import org.apache.activemq.kaha.impl.async.Location;
+
+/**
+ * The DataStore interface is what is used to store messages in the system. It
+ * also allows you to create child ReferenceStore objects which are ideal for
+ * storing client subscriptions.
+ * 
+ * @author chirino
+ */
+public interface DataIndex extends Index, IndexManager<ReferenceIndex> {
+
+    /**
+     * Adds a record to the index.
+     * 
+     * @param long1
+     * 
+     * @param id
+     * @param location
+     * @param tx
+     * @param onCompleted
+     * @throws Exception
+     */
+    public IndexEntry addMessage(long id, Location location) throws Exception;
+
+    /**
+     * This could be a duplicate operation.. Make sure that the location is not
+     * in the list already and then add.
+     * 
+     * @param location
+     */
+    public IndexEntry redoAddMessage(long id, Location location) throws Exception;
+
+    /**
+     */
+    public void removeUnreferencedRecords(IndexEntry until) throws Exception;
+
+    /**
+     * Can the store auto remove messages that have no references and that are
+     * not loaded.
+     * 
+     * @param enable
+     */
+    public void setAutoRemove(boolean enable);
+
+    public boolean contains(long id) throws Exception;
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndexManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndexManager.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndexManager.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndexManager.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.index.api;
+
+import java.util.Set;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.kaha.impl.async.Location;
+
+/**
+ * This if the root object of a persistence System in ActiveMQ. It has a life
+ * cycle and allows you to create DataIndex objects for messaging destinations.
+ * The DataIndexs hold messages in a sequenced list.
+ * 
+ * It also provides the interface storing the state of the transaction operating
+ * against the broker.
+ * 
+ * @author chirino
+ * 
+ */
+public interface DataIndexManager extends IndexManager<DataIndex>, Service {
+
+    /**
+     * Flush write buffer and return once all data has landed on disk.
+     * 
+     */
+    public void sync() throws Exception;
+
+    /**
+     * Removes all data from the indexes.
+     */
+    public void clear();
+
+    /**
+     * 
+     * @return
+     * @throws Exception
+     */
+    public Location getLastAddLocation() throws Exception;
+
+    public Set<Integer> getDataFileIdsInUse() throws Exception;
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/Index.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/Index.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/Index.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/Index.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.index.api;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author chirino
+ */
+public interface Index {
+
+    /**
+     * The name of the index.
+     * 
+     * @return
+     */
+    public String getName();
+
+    public void setProperties(Map<String, String> properties) throws Exception;
+
+    public Map<String, String> getProperties() throws Exception;
+
+    /**
+     * The number of records in the index.
+     * 
+     * @return
+     * @throws Exception
+     */
+    public long size() throws Exception;
+
+    public IndexEntry getLastAddedId() throws Exception;
+
+    public boolean remove(long id) throws Exception;
+
+    public void redoRemove(long id) throws Exception;
+
+    public List<IndexEntry> load(IndexEntry first, IndexEntry last, int max) throws Exception;
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexEntry.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexEntry.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexEntry.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexEntry.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.index.api;
+
+import org.apache.activemq.kaha.impl.async.Location;
+
+/**
+ * A cache entry is used to reference count the number of user interested in a
+ * message. A data store's cache will keep entries that in use in memory and
+ * move those that are not in use out of memory.
+ * 
+ * @author chirino
+ */
+public interface IndexEntry {
+
+    /**
+     * The key to the message in the DataIndex.
+     * 
+     * @return
+     */
+    public long getId();
+
+    /**
+     * Gives you the location of the message in the journal.
+     * 
+     * @return
+     */
+    public Location getLocation();
+
+    public DataIndex getIndex();
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexManager.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexManager.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexManager.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.index.api;
+
+import java.util.List;
+
+/**
+ * The StoreManager allows you to add/find/delete Stores in the system.
+ * 
+ * @author chirino
+ */
+public interface IndexManager<T extends Index> {
+
+    public T addStore(String name) throws Exception;
+
+    public List<T> getStores() throws Exception;
+
+    /**
+     * Gets a previously created store.
+     * 
+     * @param name
+     * @return null if the store does not exist.
+     * @throws Exception
+     */
+    public T getStore(String name) throws Exception;
+
+    public void removeStore(T store) throws Exception;
+
+}



Mime
View raw message