Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 64966 invoked from network); 4 Mar 2008 21:03:26 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 4 Mar 2008 21:03:26 -0000 Received: (qmail 41415 invoked by uid 500); 4 Mar 2008 21:03:22 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 41387 invoked by uid 500); 4 Mar 2008 21:03:22 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 41378 invoked by uid 99); 4 Mar 2008 21:03:22 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Mar 2008 13:03:22 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Mar 2008 21:02:51 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9EAC11A983A; Tue, 4 Mar 2008 13:03:01 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r633639 [2/7] - in /activemq/sandbox/activemq-router: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/activemq/ src/main/java/org/apache/activemq/broker/ src/main/java/org/apache/active... Date: Tue, 04 Mar 2008 21:01:57 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080304210301.9EAC11A983A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/MultiDestinationClientSubscription.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/MultiDestinationClientSubscription.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/MultiDestinationClientSubscription.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/MultiDestinationClientSubscription.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,350 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.core; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + +import org.apache.activemq.broker.router.api.ClientConnection; +import org.apache.activemq.broker.router.api.ClientSubscription; +import org.apache.activemq.broker.router.api.Destination; +import org.apache.activemq.broker.router.api.RequestContext; +import org.apache.activemq.broker.router.api.StoreSubscription; +import org.apache.activemq.broker.router.store.api.CacheEntry; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.filter.BooleanExpression; +import org.apache.activemq.filter.DestinationFilter; +import org.apache.activemq.filter.LogicExpression; +import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.filter.NoLocalExpression; +import org.apache.activemq.selector.SelectorParser; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * This client subscription can subscribe to multiple destinations. + * + * @author chirino + */ +abstract public class MultiDestinationClientSubscription implements ClientSubscription { + + static final private Log LOG = LogFactory.getLog(MultiDestinationClientSubscription.class); + + protected final Router router; + protected final ConsumerInfo info; + protected final BooleanExpression selector; + protected final DestinationFilter destinationFilter; + protected final int maxPrefetchSize; + + protected ClientConnection clientConnection; + protected boolean started; + + // Useful counters that are atomic so that folks can view them + // without locking the mutex. + protected AtomicLong enqueueCounter = new AtomicLong(0); + protected AtomicLong transmitCounter = new AtomicLong(0); + protected AtomicLong dequeueCounter = new AtomicLong(0); + + // We access stores via copy on write since writes are infrequent but need + // to do async reads of it. + protected ArrayList sleepingStores = new ArrayList(); + protected ArrayList stores = new ArrayList(); + + protected final Object prefetchWindowMutex = new Object() { + }; + // If the client requested that the prefetch window get extended. + protected int clientExtension; + // The number of un-acked messages in transit to the client. + protected int clientPrefetchSize; + + public MultiDestinationClientSubscription(Router router, ConsumerInfo info) throws Exception { + this.router = router; + this.info = info; + this.selector = createSelector(info); + this.destinationFilter = DestinationFilter.parseFilter(info.getDestination()); + this.maxPrefetchSize = info.getPrefetchSize(); + } + + public ConsumerInfo getInfo() { + return info; + } + + public void addStoreSubscription(StoreSubscription destination) { + synchronized (prefetchWindowMutex) { + // We access stores via copy on write since writes are infrequent + // but need to do async reads of it. + stores = new ArrayList(stores); + stores.add(destination); + } + } + + public void removeStoreSubscription(StoreSubscription destination) { + synchronized (prefetchWindowMutex) { + // We access stores via copy on write since writes are infrequent + // but need to do async reads of it. + stores = new ArrayList(stores); + stores.remove(destination); + } + } + + public void setClientConnection(ClientConnection clientConnection) { + this.clientConnection = clientConnection; + } + + public boolean matches(ActiveMQDestination destination) { + return destinationFilter.matches(destination); + } + + public ActiveMQDestination getDestinationName() { + return info.getDestination(); + } + + public boolean matches(Message node, MessageEvaluationContext context) throws IOException { + ConsumerId targetConsumerId = node.getTargetConsumerId(); + if (targetConsumerId != null) { + if (!targetConsumerId.equals(info.getConsumerId())) { + return false; + } + } + try { + return (selector == null || selector.matches(context)) && this.clientConnection.isAllowedToConsume(node); + } catch (JMSException e) { + return false; + } + } + + protected static BooleanExpression createSelector(ConsumerInfo info) throws InvalidSelectorException { + BooleanExpression rc = null; + if (info.getSelector() != null) { + rc = new SelectorParser().parse(info.getSelector()); + } + if (info.isNoLocal()) { + if (rc == null) { + rc = new NoLocalExpression(info.getConsumerId().getConnectionId()); + } else { + rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc); + } + } + if (info.getAdditionalPredicate() != null) { + if (rc == null) { + rc = info.getAdditionalPredicate(); + } else { + rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc); + } + } + return rc; + } + + public void start() throws Exception { + synchronized (prefetchWindowMutex) { + started = true; + router.addSubscription(this); + if (!stores.isEmpty()) { + for (StoreSubscription store : stores) { + store.wakeup(); + } + } + } + } + + public void stop() throws Exception { + synchronized (prefetchWindowMutex) { + started = false; + router.removeSubscription(this); + } + } + + public boolean offer(StoreSubscription source, final CacheEntry reference) throws Exception { + synchronized (prefetchWindowMutex) { + boolean clientPrefetchFulll = clientPrefetchSize >= (maxPrefetchSize + clientExtension); + if (clientPrefetchFulll) { + sleepingStores.add(source); + return false; + } + clientPrefetchSize++; + } + + reference.load(); + Destination destination = reference.getStore().getDestination(); + if (destination.lockForDispatch(this, reference)) { + enqueueCounter.incrementAndGet(); + final MessageDispatch md = new MessageDispatch(); + md.setConsumerId(info.getConsumerId()); + md.setDestination(info.getDestination()); + md.setMessage(reference.getMessage()); + md.setTransmitCallback(new Runnable() { + public void run() { + onTransmit(reference); + } + }); + + // We have to synchronize here because onDispatch may want to know + // the + // exact order that the message was sent in cause the ack handling + // code + // is dispatch order dependent. + dispatch(reference, md); + } else { + synchronized (prefetchWindowMutex) { + clientPrefetchSize--; + } + reference.unload(); + } + return true; + } + + public void offer(StoreSubscription source, List list) throws Exception { + // for (Iterator iterator = list.iterator(); + // iterator.hasNext();) { + // CacheEntry o = iterator.next(); + // if (offer(source, o)) { + // iterator.remove(); + // } else { + // break; + // } + // } + while (!list.isEmpty()) { + int estimatedDispatch; + synchronized (prefetchWindowMutex) { + estimatedDispatch = (clientExtension + maxPrefetchSize) - clientPrefetchSize; + if (estimatedDispatch == 0) { + sleepingStores.add(source); + return; + } + estimatedDispatch = Math.min(estimatedDispatch, list.size()); + clientPrefetchSize += estimatedDispatch; + } + + int dispatched = 0; + for (Iterator iterator = list.iterator(); iterator.hasNext();) { + final CacheEntry reference = iterator.next(); + if (dispatched >= estimatedDispatch) { + break; + } + + Destination destination = reference.getStore().getDestination(); + if (destination.lockForDispatch(this, reference)) { + + assert reference.getMessage() != null : "The message should never be null if a lock is successful."; + + enqueueCounter.incrementAndGet(); + final MessageDispatch md = new MessageDispatch(); + md.setConsumerId(info.getConsumerId()); + md.setDestination(info.getDestination()); + md.setMessage(reference.getMessage()); + md.setTransmitCallback(new Runnable() { + public void run() { + onTransmit(reference); + } + }); + + // We have to synchronize here because onDispatch may want + // to know the + // exact order that the message was sent in cause the ack + // handling code + // is dispatch order dependent. + dispatch(reference, md); + dispatched++; + } else { + reference.unload(); + } + iterator.remove(); + } + int diff = (estimatedDispatch - dispatched); + if (diff > 0) { + synchronized (prefetchWindowMutex) { + clientPrefetchSize -= diff; + if (dispatched == 0) { + sleepingStores.add(source); + } + } + } + } + } + + /** + * @param ref + * @param md + */ + protected void dispatch(final CacheEntry ref, final MessageDispatch md) { + clientConnection.getTransmissionQueue().enqueue(md); + } + + /** + * Run when the message is actually sent to the client. + * + * @param ref + */ + protected void onTransmit(final CacheEntry ref) { + // Let the message get unloaded from memory. Not + // needed anymore. + transmitCounter.incrementAndGet(); + ref.unload(); + } + + public void acknowledge(RequestContext context, MessageAck ack) throws Exception { + if (ack.isStandardAck()) { + standardAck(context, ack); + } else if (ack.isDeliveredAck()) { + deliveredAck(ack); + } else if (ack.isRedeliveredAck()) { + redeliveredAck(ack); + } else if (ack.isPoisonAck()) { + poisonAck(ack); + } + wakeup(); + } + + public void wakeup() { + ArrayList stores = null; + synchronized (prefetchWindowMutex) { + int capacity = (clientExtension + maxPrefetchSize) - clientPrefetchSize; + if (started && capacity > (maxPrefetchSize / 2) && !this.sleepingStores.isEmpty()) { + stores = this.sleepingStores; + this.sleepingStores = new ArrayList(stores.size()); + } + } + if (stores != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Waking up cause we have enough capacitly to load a big batch: " + this.getInfo().getConsumerId() + ", stores avail: " + stores.size()); + } + for (StoreSubscription store : stores) { + store.wakeup(); + } + } + } + + abstract protected void poisonAck(MessageAck ack) throws Exception; + + abstract protected void redeliveredAck(MessageAck ack) throws Exception; + + abstract protected void deliveredAck(MessageAck ack) throws Exception; + + abstract protected void standardAck(RequestContext context, MessageAck ack) throws Exception; + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/QualityOfService.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/QualityOfService.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/QualityOfService.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/QualityOfService.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.core; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.activemq.Service; +import org.apache.activemq.broker.router.api.ClientSubscription; +import org.apache.activemq.broker.router.api.RequestContext; +import org.apache.activemq.broker.router.store.api.CacheEntry; +import org.apache.activemq.broker.router.store.api.DataStore; +import org.apache.activemq.broker.router.store.api.ReferenceStore; +import org.apache.activemq.command.Message; +import org.apache.activemq.filter.MessageEvaluationContext; + +/** + * + * @author chirino + */ +public class QualityOfService implements Service { + + public static final String TRANSIENT_PREFIX = "transient:"; + private LinkedList subscriptions = new LinkedList(); + private DataStore dataStore; + public long lastId = 0; + public CacheEntry lastAddedMessage; + + public void enqueue(RequestContext ctx, Message message, Runnable onStored) throws Exception { + + CacheEntry reference = null; + LinkedList subs; + try { + synchronized (this) { + lastId++; + lastAddedMessage = reference = dataStore.addMessage(lastId, message, onStored); + subs = subscriptions; + } + + // Wake up the subscriptions so that they push messages out to the + // clients. + MessageEvaluationContext ec = new MessageEvaluationContext(); + ec.setDestination(dataStore.getDestination().getName()); + ec.setMessageReference(message); + + for (BroadcastStoreSubscription subscription : subs) { + // Lock the subscription state so that we can properly either + // dispatch the message or put it on the pending message + // store. + if (subscription.matches(message, ec)) { + subscription.offer(reference); + } + } + } catch (Exception e) { + e.printStackTrace(); + throw e; + } finally { + if (reference != null) { + reference.unload(); + } + } + + } + + public void addSubscription(ClientSubscription subscription) throws Exception { + synchronized (this) { + subscriptions = new LinkedList(subscriptions); + ReferenceStore store = dataStore.addStore(TRANSIENT_PREFIX + subscription.getInfo().getConsumerId()); + BroadcastStoreSubscription ss = new BroadcastStoreSubscription(dataStore, store, subscription); + if (lastAddedMessage != null) { + ss.recoverUntil(lastAddedMessage); + } + subscriptions.add(ss); + subscription.addStoreSubscription(ss); + ss.start(); + subscription.wakeup(); + } + } + + public void removeSubscription(ClientSubscription subscription) throws Exception { + synchronized (this) { + subscriptions = new LinkedList(subscriptions); + for (Iterator iterator = subscriptions.iterator(); iterator.hasNext();) { + BroadcastStoreSubscription ss = iterator.next(); + if (ss.getClientSubscription() == subscription) { + ss.getDataStore().removeStore(ss.getStore()); + iterator.remove(); + break; + } + } + } + } + + public DataStore getDataStore() { + return dataStore; + } + + public void setDataStore(DataStore dataStore) throws Exception { + this.dataStore = dataStore; + } + + public void start() throws Exception { + + if (dataStore == null) { + throw new IllegalArgumentException("The dataStore property must be set before being started."); + } + + lastAddedMessage = dataStore.getLastAddedEntry(); + if (lastAddedMessage != null) { + lastId = lastAddedMessage.getId(); + } + + // Delete any transient stores that be have been left around since the + // last restart. + List stores = dataStore.getStores(); + for (ReferenceStore s : stores) { + if (s.getName().startsWith(TRANSIENT_PREFIX)) { + dataStore.removeStore(s); + } + } + } + + public void stop() throws Exception { + List stores = dataStore.getStores(); + for (ReferenceStore s : stores) { + if (s.getName().startsWith(TRANSIENT_PREFIX)) { + dataStore.removeStore(s); + } + } + } + +} \ No newline at end of file Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/Router.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/Router.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/Router.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/Router.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.core; + +import java.util.HashMap; +import java.util.Set; + +import javax.jms.JMSException; + +import org.apache.activemq.broker.router.api.ClientSubscription; +import org.apache.activemq.broker.router.api.Destination; +import org.apache.activemq.broker.router.api.DestinationManager; +import org.apache.activemq.broker.router.core.SimpleDestinationManager.DestinationManagerCallback; +import org.apache.activemq.broker.router.core.queue.BroadcastQueue; +import org.apache.activemq.broker.router.core.queue.QueueSubscription; +import org.apache.activemq.broker.router.core.topic.Topic; +import org.apache.activemq.broker.router.core.topic.TopicSubscription; +import org.apache.activemq.broker.router.store.api.DataStoreManager; +import org.apache.activemq.broker.router.util.Selector; +import org.apache.activemq.broker.router.util.SelectorThreadPool; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; + +/** + * + * @author chirino + */ +public class Router implements DestinationManagerCallback { + + private final HashMap subscriptions = new HashMap(); + // Used to monitor subscriptions that need someone to fill their + // server side prefetch buffers. + private final Selector prefetchSelector = new Selector(); + + private DestinationManager destinationManager; + private SelectorThreadPool threadPool; + private String brokerName; + private DataStoreManager persistentDataStoreManager; + private DataStoreManager transientDataStoreManager; + + // //////////////////////////////////////////////////////////////////////// + // Lifecycle + // //////////////////////////////////////////////////////////////////////// + + public void start() throws Exception { + if (persistentDataStoreManager == null) { + throw new IllegalArgumentException("persistentDataStoreManager must be set"); + } + if (transientDataStoreManager == null) { + throw new IllegalArgumentException("transientDataStoreManager must be set"); + } + if (threadPool == null) { + throw new IllegalArgumentException("threadPool must be set"); + } + if (destinationManager == null) { + throw new IllegalArgumentException("destinationManager must be set"); + } + if (brokerName == null) { + throw new IllegalArgumentException("brokerName must be set"); + } + persistentDataStoreManager.start(); + transientDataStoreManager.start(); + destinationManager.setDestinationManagerCallback(this); + threadPool.add(prefetchSelector); + threadPool.start(); + } + + public void stop() throws Exception { + transientDataStoreManager.stop(); + persistentDataStoreManager.stop(); + threadPool.stop(); + } + + // //////////////////////////////////////////////////////////////////////// + // DestinationManagerCallback Methods + // //////////////////////////////////////////////////////////////////////// + + public BroadcastQueue createQueue(ActiveMQQueue name) { + return new BroadcastQueue(this, name); + } + + public Topic createTopic(ActiveMQTopic name) { + return new Topic(this, name); + } + + /** + * Matches up existing subscriptions with this destination. + * + * @param destination + * @throws Exception + * @throws JMSException + */ + public void addDestination(Destination destination) throws Exception { + synchronized (destinationManager.getMutex()) { + // TODO: catch the exception and undo the subscription.. + // Add all consumers that are interested in the destination. + for (ClientSubscription sub : subscriptions.values()) { + if (sub.matches(destination.getName())) { + destination.addSubscription(sub); + } + } + } + } + + public void removeDestination(Destination destination, boolean force) throws Exception { + // Safety check.. + synchronized (destinationManager.getMutex()) { + if (!force) { + int counter = 0; + for (ClientSubscription sub : subscriptions.values()) { + if (sub.matches(destination.getName())) { + counter++; + } + } + if (counter != 0) { + throw new JMSException("Destination '" + destination + "' still has an active subscriptions: " + counter); + } + } + + for (ClientSubscription sub : subscriptions.values()) { + if (sub.matches(destination.getName())) { + destination.removeSubscription(sub); + } + } + } + } + + // //////////////////////////////////////////////////////////////////////// + // Subscription Management + // //////////////////////////////////////////////////////////////////////// + + /** + * Used by the ClientConnection + */ + public ClientSubscription createSubscription(ConsumerInfo consumerInfo) throws Exception { + ClientSubscription rc = null; + ActiveMQDestination name = consumerInfo.getDestination(); + if (name.isQueue()) { + rc = new QueueSubscription(this, consumerInfo); + } else { + rc = new TopicSubscription(this, consumerInfo); + } + return rc; + } + + /** + * Used in the Subscription.start() + * + * @throws Exception + */ + public void addSubscription(ClientSubscription subscription) throws Exception { + synchronized (destinationManager.getMutex()) { + // TODO: catch the exception and undo the subscription.. + subscriptions.put(subscription.getInfo().getConsumerId(), subscription); + ActiveMQDestination name = subscription.getDestinationName(); + Set destinations = destinationManager.getDestinations(name); + for (Destination destination : destinations) { + destination.addSubscription(subscription); + } + } + } + + /** + * Used in the Subscription.stop() + * + * @throws Exception + */ + public void removeSubscription(ClientSubscription subscription) throws Exception { + synchronized (destinationManager.getMutex()) { + ActiveMQDestination name = subscription.getDestinationName(); + Set destinations = destinationManager.getDestinations(name); + for (Destination destination : destinations) { + destination.removeSubscription(subscription); + } + subscriptions.remove(subscription.getInfo().getConsumerId()); + } + } + + // //////////////////////////////////////////////////////////////////////// + // Accessors and Mutators + // //////////////////////////////////////////////////////////////////////// + + public Selector getDestinationPrefetchSelector() { + return prefetchSelector; + } + + public DestinationManager getDestinationManager() { + return destinationManager; + } + + /** + * Flushes all possible caches associated with this broker. + */ + public void fushCaches() { + } + + public ClientSubscription getSubscription(ConsumerId consumerId) { + return null; + } + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + public DataStoreManager getPersistentDataStoreManager() { + return persistentDataStoreManager; + } + + public DataStoreManager getTransientDataStoreManager() { + return transientDataStoreManager; + } + + public void setTransientDataStoreManager(DataStoreManager transientDataStoreManager) { + this.transientDataStoreManager = transientDataStoreManager; + } + + public void setPersistentDataStoreManager(DataStoreManager persistentDataStoreManager) { + this.persistentDataStoreManager = persistentDataStoreManager; + } + + public SelectorThreadPool getThreadPool() { + return threadPool; + } + + public void setThreadPool(SelectorThreadPool threadPool) { + this.threadPool = threadPool; + } + + public void setDestinationManager(DestinationManager destinationManager) { + this.destinationManager = destinationManager; + } + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/RouterFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/RouterFactory.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/RouterFactory.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/RouterFactory.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.core; + +import org.apache.activemq.broker.router.api.DestinationManager; +import org.apache.activemq.broker.router.store.api.DataStoreManager; +import org.apache.activemq.broker.router.store.journal.JournalDataStoreManagerFactory; +import org.apache.activemq.broker.router.store.memory.MemoryDataStoreManager; +import org.apache.activemq.broker.router.util.SelectorThreadPool; + +/** + * + * @author chirino + */ +public class RouterFactory { + + protected String brokerName; + protected DataStoreManager persistentDataStoreManager; + protected DataStoreManager transientDataStoreManager; + protected SelectorThreadPool threadPool; + protected DestinationManager destinationManager; + + public Router createRouter() throws Exception { + Router rc = new Router(); + rc.setThreadPool(getThreadPool()); + rc.setDestinationManager(getDestinationManager()); + rc.setBrokerName(getBrokerName()); + rc.setPersistentDataStoreManager(getPersistentDataStoreManager()); + rc.setTransientDataStoreManager(getTransientDataStoreManager()); + return rc; + } + + public String getBrokerName() { + if (brokerName == null) { + brokerName = "localhost"; + } + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + public DataStoreManager getPersistentDataStoreManager() throws Exception { + if (persistentDataStoreManager == null) { + JournalDataStoreManagerFactory factory = new JournalDataStoreManagerFactory(); + persistentDataStoreManager = factory.createJournalDataStoreManager(); + } + return persistentDataStoreManager; + } + + public void setPersistentDataStoreManager(DataStoreManager persistentDataStoreManager) { + this.persistentDataStoreManager = persistentDataStoreManager; + } + + public DataStoreManager getTransientDataStoreManager() { + if (transientDataStoreManager == null) { + transientDataStoreManager = new MemoryDataStoreManager(); + } + return transientDataStoreManager; + } + + public void setTransientDataStoreManager(DataStoreManager transientDataStoreManager) { + this.transientDataStoreManager = transientDataStoreManager; + } + + public SelectorThreadPool getThreadPool() { + if (threadPool == null) { + threadPool = new SelectorThreadPool("ActiveMQ " + brokerName, 10, 50); + } + return threadPool; + } + + public void setThreadPool(SelectorThreadPool threadPool) { + this.threadPool = threadPool; + } + + public DestinationManager getDestinationManager() { + if (destinationManager == null) { + destinationManager = new SimpleDestinationManager(); + } + return destinationManager; + } + + public void setDestinationManager(DestinationManager destinationManager) { + this.destinationManager = destinationManager; + } + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/SimpleDestinationManager.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/SimpleDestinationManager.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/SimpleDestinationManager.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/SimpleDestinationManager.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.core; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; + +import javax.jms.JMSException; + +import org.apache.activemq.broker.router.api.Destination; +import org.apache.activemq.broker.router.api.DestinationManager; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; + +/** + * + * @author chirino + */ +public class SimpleDestinationManager implements DestinationManager { + + private final HashMap destinations = new HashMap(); + private boolean autoCreate = true; + private final Object mutex = new Object(); + private DestinationManagerCallback destinationManagerCallback; + + public interface DestinationManagerCallback { + Destination createQueue(ActiveMQQueue name); + + Destination createTopic(ActiveMQTopic name); + + void addDestination(Destination destination) throws Exception; + + void removeDestination(Destination destination, boolean force) throws Exception; + } + + public SimpleDestinationManager() { + } + + /** + * @see org.apache.activemq.broker.router.perf.core.DestinationManager#getDestinations(org.apache.activemq.command.ActiveMQDestination) + */ + public Set getDestinations(ActiveMQDestination name) throws Exception { + if (name.isPattern()) { + throw new IllegalArgumentException("This destination manager does not support wild card names."); + } + + if (name.isComposite()) { + HashSet rc = new HashSet(); + ActiveMQDestination[] compositeDestinations = name.getCompositeDestinations(); + for (int i = 0; i < compositeDestinations.length; i++) { + Set a = getDestinations(compositeDestinations[i]); + rc.addAll(a); + } + return rc; + } + + synchronized (mutex) { + Destination destination = destinations.get(name); + if (destination == null) { + if (autoCreate) { + destination = createDestination(name); + } else { + throw new JMSException("Could not find destination: " + name); + } + } + return Collections.singleton(destination); + } + + } + + /** + * @see org.apache.activemq.broker.router.perf.core.DestinationManager#createDestination(org.apache.activemq.command.ActiveMQDestination) + */ + public Destination createDestination(ActiveMQDestination name) throws Exception { + Destination destination; + + byte type = name.getDestinationType(); + switch (type) { + case ActiveMQDestination.QUEUE_TYPE: + destination = destinationManagerCallback.createQueue((ActiveMQQueue) name); + break; + case ActiveMQDestination.TOPIC_TYPE: + destination = destinationManagerCallback.createTopic((ActiveMQTopic) name); + break; + default: + throw new IllegalStateException("Invalid destination type: " + type); + } + + destination.start(); + + synchronized (mutex) { + destinations.put(name, destination); + destinationManagerCallback.addDestination(destination); + } + return destination; + } + + /** + * @see org.apache.activemq.broker.router.perf.core.DestinationManager#destroyDestination(org.apache.activemq.command.ActiveMQDestination, + * boolean) + */ + public void destroyDestination(ActiveMQDestination name, boolean force) throws Exception { + Set rc = getDestinations(name); + for (Destination d : rc) { + d.stop(); + destinationManagerCallback.removeDestination(d, force); + destinations.remove(d.getName()); + } + } + + /** + * @see org.apache.activemq.broker.router.perf.core.DestinationManager#isAutoCreate() + */ + public boolean isAutoCreate() { + return autoCreate; + } + + /** + * @see org.apache.activemq.broker.router.perf.core.DestinationManager#setAutoCreate(boolean) + */ + public void setAutoCreate(boolean autoCreate) { + this.autoCreate = autoCreate; + } + + /** + * @see org.apache.activemq.broker.router.perf.core.DestinationManager#getMutex() + */ + public Object getMutex() { + return mutex; + } + + public DestinationManagerCallback getDestinationManagerCallback() { + return destinationManagerCallback; + } + + public void setDestinationManagerCallback(DestinationManagerCallback destinationManagerCallback) { + this.destinationManagerCallback = destinationManagerCallback; + } + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/TransportClientConnection.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/TransportClientConnection.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/TransportClientConnection.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/TransportClientConnection.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.core; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.broker.router.api.ClientConnection; +import org.apache.activemq.broker.router.api.ClientSubscription; +import org.apache.activemq.broker.router.api.Destination; +import org.apache.activemq.broker.router.api.RequestContext; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.command.ActiveMQStreamMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.Response; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportListener; + +/** + * + * @author chirino + */ +public class TransportClientConnection implements ClientConnection { + + private final Transport transport; + private final Router router; + + public final class TansportTransmitQueue implements TransmitQueue { + List queue = new LinkedList(); + + synchronized private Command dequeue(long timeout) throws InterruptedException { + if (queue.isEmpty()) { + wait(timeout); + if (queue.isEmpty()) { + return null; + } + } + Command rc = queue.remove(0); + return rc; + } + + synchronized public void enqueue(Command command) { + queue.add(command); + notify(); + } + + synchronized public void enqueueFirst(Command command) { + queue.add(0, command); + notify(); + } + } + + private final TansportTransmitQueue transmissionQueue = new TansportTransmitQueue(); + + // Keeps track of client state. + private final Map subscriptions = Collections.synchronizedMap(new HashMap()); + + public TransportClientConnection(Router router, Transport transport) { + this.router = router; + this.transport = transport; + + this.transport.setTransportListener(new TransportListener() { + public void onCommand(Object command) { + TransportClientConnection.this.onCommand((Command) command); + } + + public void onException(IOException error) { + onTransportError(error); + } + + public void transportInterupted() { + } + + public void transportResumed() { + } + }); + + } + + public void start() throws Exception { + transport.start(); + Thread thread = new Thread("ActiveMQ Transmit To: " + transport.getRemoteAddress()) { + @Override + public void run() { + try { + Command command = transmissionQueue.dequeue(100); + while (command != null) { + transport.oneway(command); + if (command.getDataStructureType() == MessageDispatch.DATA_STRUCTURE_TYPE) { + ((MessageDispatch) command).getTransmitCallback().run(); + } + command = transmissionQueue.dequeue(100); + } + } catch (Throwable e) { + onTransmitError(e); + } + } + }; + thread.setPriority(9); + thread.start(); + } + + public void stop() throws Exception { + transport.stop(); + } + + public void onCommand(Command command) { + + RequestContext requestContext = new RequestContext(); + requestContext.router = router; + requestContext.clientConnection = this; + requestContext.command = command; + + Response response = null; + int commandId = command.getCommandId(); + try { + switch (command.getDataStructureType()) { + case ActiveMQMessage.DATA_STRUCTURE_TYPE: + case ActiveMQTextMessage.DATA_STRUCTURE_TYPE: + case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE: + case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE: + case ActiveMQMapMessage.DATA_STRUCTURE_TYPE: + onMessage(requestContext, (Message) command); + break; + case ConsumerInfo.DATA_STRUCTURE_TYPE: + onConsumerInfo(requestContext, (ConsumerInfo) command); + break; + case MessageAck.DATA_STRUCTURE_TYPE: + onMessageAck(requestContext, (MessageAck) command); + break; + default: + throw new RuntimeException("Not yet implemented."); + } + } catch (Throwable e) { + if (command.isResponseRequired()) { + response = new ExceptionResponse(e); + } + onCommandError(e); + } + + if (response != null && requestContext.autoRespond) { + response.setCorrelationId(commandId); + getTransmissionQueue().enqueueFirst(response); + } + } + + public void onConsumerInfo(RequestContext requestContext, ConsumerInfo info) throws Exception { + if (info.isDurable()) { + throw new RuntimeException("Not yet implemented."); + } else { + final ClientSubscription subscription = router.createSubscription(info); + subscription.setClientConnection(this); + subscriptions.put(info.getConsumerId(), subscription); + subscription.start(); + } + } + + public void onMessage(RequestContext requestContext, final Message message) throws Exception { + + ActiveMQDestination name = message.getDestination(); + final Set destinations = router.getDestinationManager().getDestinations(name); + + // Don't auto response.. we will send a manual response once the message + // is securely on disk. + requestContext.autoRespond = false; + Runnable completionHandler = null; + if (message.isResponseRequired()) { + completionHandler = new Runnable() { + AtomicInteger completeCounter = new AtomicInteger(destinations.size()); + + public void run() { + if (completeCounter.decrementAndGet() == 0) { + Response response = new Response(); + response.setCorrelationId(message.getCommandId()); + getTransmissionQueue().enqueueFirst(response); + } + } + }; + } + + for (Destination destination : destinations) { + destination.enqueue(requestContext, message, completionHandler); + } + } + + /** + * acknowledge a message. + * + * @param requestContext + * + * @param command + * @throws Exception + */ + private void onMessageAck(RequestContext requestContext, MessageAck ack) throws Exception { + ClientSubscription subscription = subscriptions.get(ack.getConsumerId()); + subscription.acknowledge(requestContext, ack); + } + + /** + * Error occurred while processing a client command. Called from the + * transport thread. + * + * @param e + */ + private void onCommandError(Throwable e) { + } + + /** + * Error occurred at the transport layer.. Called from the transport thread. + * + * @param error + */ + protected void onTransportError(IOException error) { + } + + /** + * Error occurred while polling for data to send the client. Called from the + * context of the transmit thread. + * + * @param error + */ + protected void onTransmitError(Throwable e) { + } + + public TransmitQueue getTransmissionQueue() { + return transmissionQueue; + } + + /** + * Called by a Subscription + * + * @param node + * @return + */ + public boolean isAllowedToConsume(Message node) { + return false; + } + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/package.html URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/package.html?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/package.html (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/package.html Tue Mar 4 13:01:41 2008 @@ -0,0 +1,25 @@ + + + + + + +The implementations of the org.apache.activemq.broker.router.api package. + + + Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BroadcastQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BroadcastQueue.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BroadcastQueue.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BroadcastQueue.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.core.queue; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.broker.router.api.ClientSubscription; +import org.apache.activemq.broker.router.api.RequestContext; +import org.apache.activemq.broker.router.core.BroadcastDestination; +import org.apache.activemq.broker.router.core.QualityOfService; +import org.apache.activemq.broker.router.core.Router; +import org.apache.activemq.broker.router.store.api.CacheEntry; +import org.apache.activemq.broker.router.store.api.DataStore; +import org.apache.activemq.broker.router.store.api.DataStoreManager; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; + +/** + * + * @author chirino + */ +public class BroadcastQueue extends BroadcastDestination { + + // It should be easy to support message priorities by + // creating a QoS for each priority level. + private final ArrayList allQos = new ArrayList(); + private final QualityOfService transientQos = new QualityOfService(); + private final QualityOfService persistentQos = new QualityOfService(); + + public BroadcastQueue(Router router, ActiveMQDestination name) { + super(router, name); + allQos.add(persistentQos); + allQos.add(transientQos); + } + + public void dequeue(RequestContext context, MessageAck ack, DataStore dataStore, long storeId) throws Exception { + dataStore.remove(storeId, null); + } + + public boolean lockForDispatch(ClientSubscription source, CacheEntry ref) { + return ref.lock(); + } + + protected QualityOfService chooseQosFor(Message message) { + if (message.isPersistent()) { + return persistentQos; + } else { + return transientQos; + } + } + + protected List getAllQos() { + return allQos; + } + + @Override + public void start() throws Exception { + + DataStoreManager dsm = router.getPersistentDataStoreManager(); + DataStore ds = dsm.addStore(getName().getQualifiedName()); + ds.setDestination(this); + persistentQos.setDataStore(ds); + + dsm = router.getTransientDataStoreManager(); + ds = dsm.addStore(getName().getQualifiedName()); + ds.setDestination(this); + transientQos.setDataStore(ds); + + super.start(); + } +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/QueueSubscription.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/QueueSubscription.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/QueueSubscription.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/QueueSubscription.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.core.queue; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import javax.jms.JMSException; + +import org.apache.activemq.broker.router.api.RequestContext; +import org.apache.activemq.broker.router.core.MultiDestinationClientSubscription; +import org.apache.activemq.broker.router.core.Router; +import org.apache.activemq.broker.router.store.api.CacheEntry; +import org.apache.activemq.broker.router.store.api.DataStore; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.transaction.Synchronization; + +/** + * + * @author chirino + */ +public class QueueSubscription extends MultiDestinationClientSubscription { + + // Messages that have been dispatched to the client and are either in + // transit to the client in the prefetch or being processed. + class PrefetchItem { + private final MessageId messageId; + private final Long storeId; + private final DataStore dataStore; + + public PrefetchItem(DataStore dataStore, MessageId messageId, Long storeId) { + this.dataStore = dataStore; + this.messageId = messageId; + this.storeId = storeId; + } + } + + private final Object dispatchMutex = new Object() { + }; + private final LinkedList clientPrefetch = new LinkedList(); + + public QueueSubscription(Router router, ConsumerInfo info) throws Exception { + super(router, info); + } + + protected void dispatch(final CacheEntry ref, final MessageDispatch md) { + // We sync here because we the client has to receive the messages in the + // same order placed on + // the clientPrefetch list. This is because ack processing is order + // dependent. + synchronized (dispatchMutex) { + synchronized (clientPrefetch) { + clientPrefetch.add(new PrefetchItem(ref.getStore(), md.getMessage().getMessageId(), ref.getId())); + } + super.dispatch(ref, md); + } + } + + protected void standardAck(final RequestContext context, final MessageAck ack) throws Exception { + boolean inAckRange = false; + boolean callDispatchMatched = false; + final List ackedMessages = new ArrayList(); + + // Acknowledge all dispatched messages up till the message id of + // the acknowledgment. + synchronized (clientPrefetch) { + + for (Iterator iterator = clientPrefetch.iterator(); iterator.hasNext();) { + PrefetchItem ref = iterator.next(); + MessageId messageId = ref.messageId; + + // Only keep going if within the ack range. + if (!inAckRange) { + if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) { + inAckRange = true; + } else { + continue; + } + } + if (inAckRange) { + ackedMessages.add(ref); + if (!context.isInTransaction()) { + iterator.remove(); + } + + if (ack.getLastMessageId().equals(messageId)) { + callDispatchMatched = true; + break; + } + } + } + } + + if (context.isInTransaction()) { + context.getTransaction().addSynchronization(new Synchronization() { + public void afterCommit() throws Exception { + dequeueCounter.addAndGet(ackedMessages.size()); + synchronized (prefetchWindowMutex) { + clientExtension -= ackedMessages.size(); + if (clientExtension < 0) { + clientExtension = 0; + } + clientPrefetchSize -= ackedMessages.size(); + } + for (final PrefetchItem ref : ackedMessages) { + synchronized (clientPrefetch) { + clientPrefetch.remove(ref); + } + ref.dataStore.getDestination().dequeue(context, ack, ref.dataStore, ref.storeId); + } + } + }); + synchronized (prefetchWindowMutex) { + if (maxPrefetchSize != 0) { + clientExtension += ackedMessages.size(); + } + } + } else { + dequeueCounter.addAndGet(ackedMessages.size()); + synchronized (prefetchWindowMutex) { + clientExtension -= ackedMessages.size(); + if (clientExtension < 0) { + clientExtension = 0; + } + clientPrefetchSize -= ackedMessages.size(); + } + for (final PrefetchItem ref : ackedMessages) { + ref.dataStore.getDestination().dequeue(context, ack, ref.dataStore, ref.storeId); + } + } + + if (!callDispatchMatched) { + throw new JMSException("Could not correlate acknowledgment with clientPrefetch message: " + ack); + } + + } + + protected void deliveredAck(MessageAck ack) throws JMSException { + synchronized (prefetchWindowMutex) { + clientExtension += ack.getMessageCount(); + } + } + + protected void poisonAck(MessageAck ack) throws JMSException { + synchronized (prefetchWindowMutex) { + + // TODO: what if the message is already in a DLQ??? + // Handle the poison ACK case: we need to send the message to a + // DLQ + if (ack.isInTransaction()) { + throw new JMSException("Poison ack cannot be transacted: " + ack); + } + // Acknowledge all clientPrefetch messages up till the message id of + // the + // acknowledgment. + // int index = 0; + boolean inAckRange = false; + List removeList = new ArrayList(); + for (final PrefetchItem ref : clientPrefetch) { + MessageId messageId = ref.messageId; + if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) { + inAckRange = true; + } + if (inAckRange) { + // sendToDLQ(context, node); + // node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); + // removeList.add(node); + // dequeueCounter++; + // index++; + // acknowledge(context, ack, node); + // if (ack.getLastMessageId().equals(messageId)) { + // prefetchExtension = Math.max(0, prefetchExtension - + // (index + 1)); + // callDispatchMatched = true; + // break; + // } + } + } + for (final Message node : removeList) { + clientPrefetch.remove(node); + } + } + } + + protected void redeliveredAck(MessageAck ack) throws JMSException { + synchronized (prefetchWindowMutex) { + boolean callDispatchMatched = false; + // Message was re-delivered but it was not yet considered to be DLQ + // message. + // Acknowledge all clientPrefetch messages up till the message id of + // the acknowledgment. + boolean inAckRange = false; + for (final PrefetchItem reference : clientPrefetch) { + MessageId messageId = reference.messageId; + if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) { + inAckRange = true; + } + if (inAckRange) { + // TODO: figure out a way to do redeliver counter + // increments.. + // reference.incrementRedeliveryCounter(); + if (ack.getLastMessageId().equals(messageId)) { + callDispatchMatched = true; + break; + } + } + } + if (!callDispatchMatched) { + throw new JMSException("Could not correlate acknowledgment with clientPrefetch message: " + ack); + } + } + } + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/Topic.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/Topic.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/Topic.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/Topic.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.core.topic; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.broker.router.api.ClientSubscription; +import org.apache.activemq.broker.router.api.RequestContext; +import org.apache.activemq.broker.router.core.BroadcastDestination; +import org.apache.activemq.broker.router.core.QualityOfService; +import org.apache.activemq.broker.router.core.Router; +import org.apache.activemq.broker.router.store.api.CacheEntry; +import org.apache.activemq.broker.router.store.api.DataStore; +import org.apache.activemq.broker.router.store.api.DataStoreManager; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; + +/** + * + * @author chirino + */ +public class Topic extends BroadcastDestination { + + // It should be easy to support message priorities by + // creating a QoS for each priority level. + private final ArrayList allQos = new ArrayList(); + private final QualityOfService transientQos = new QualityOfService(); + private final QualityOfService persistentQos = new QualityOfService(); + + public Topic(Router router, ActiveMQDestination name) { + super(router, name); + allQos.add(persistentQos); + allQos.add(transientQos); + } + + public void dequeue(RequestContext context, MessageAck ack, DataStore dataStore, long storeId) throws Exception { + } + + public boolean lockForDispatch(ClientSubscription source, CacheEntry ref) { + return true; + } + + protected QualityOfService chooseQosFor(Message message) { + if (message.isPersistent()) { + return persistentQos; + } else { + return transientQos; + } + } + + protected List getAllQos() { + return allQos; + } + + @Override + public void start() throws Exception { + DataStoreManager dsm = router.getPersistentDataStoreManager(); + DataStore ds = dsm.addStore(getName().getQualifiedName()); + ds.setDestination(this); + ds.setAutoRemove(true); + persistentQos.setDataStore(ds); + + dsm = router.getTransientDataStoreManager(); + ds = dsm.addStore(getName().getQualifiedName()); + ds.setAutoRemove(true); + ds.setDestination(this); + transientQos.setDataStore(ds); + + super.start(); + } + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/TopicSubscription.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/TopicSubscription.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/TopicSubscription.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/TopicSubscription.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.core.topic; + +import javax.jms.JMSException; + +import org.apache.activemq.broker.router.api.RequestContext; +import org.apache.activemq.broker.router.core.MultiDestinationClientSubscription; +import org.apache.activemq.broker.router.core.Router; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.transaction.Synchronization; + +/** + * + * @author chirino + */ +public class TopicSubscription extends MultiDestinationClientSubscription { + + public TopicSubscription(Router router, ConsumerInfo info) throws Exception { + super(router, info); + } + + protected void standardAck(RequestContext context, final MessageAck ack) throws Exception { + if (context.isInTransaction()) { + clientExtension += ack.getMessageCount(); + context.getTransaction().addSynchronization(new Synchronization() { + public void afterCommit() throws Exception { + dequeueCounter.addAndGet(ack.getMessageCount()); + synchronized (prefetchWindowMutex) { + clientExtension -= ack.getMessageCount() * 2; + if (clientExtension < 0) { + clientExtension = 0; + } + clientPrefetchSize -= ack.getMessageCount(); + } + } + }); + } else { + dequeueCounter.addAndGet(ack.getMessageCount()); + synchronized (prefetchWindowMutex) { + clientExtension -= ack.getMessageCount(); + if (clientExtension < 0) { + clientExtension = 0; + } + clientPrefetchSize -= ack.getMessageCount(); + } + } + } + + protected void deliveredAck(MessageAck ack) throws JMSException { + synchronized (prefetchWindowMutex) { + clientExtension += ack.getMessageCount(); + } + } + + protected void poisonAck(MessageAck ack) throws JMSException { + // Can't do much about it... We don't DLQ stuff sent to topics because 1 + // poison + // message could generate a bunch of poisonAcks and then you would + // end up with duplicate messages in a DLQ. + throw new JMSException("Poison acks not supported on a topic please check that your client is compaible with this broker."); + } + + protected void redeliveredAck(MessageAck ack) throws JMSException { + // This also is not supported in the topic case. Just cause a message + // has been re-delivered to 1 consumer multiple times does not mean that + // message + // to other topic consumer need their re-delivery counter incremented. + throw new JMSException("Poison acks not supported on a topic please check that your client is compaible with this broker."); + } + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndex.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndex.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndex.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndex.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.index.api; + +import org.apache.activemq.kaha.impl.async.Location; + +/** + * The DataStore interface is what is used to store messages in the system. It + * also allows you to create child ReferenceStore objects which are ideal for + * storing client subscriptions. + * + * @author chirino + */ +public interface DataIndex extends Index, IndexManager { + + /** + * Adds a record to the index. + * + * @param long1 + * + * @param id + * @param location + * @param tx + * @param onCompleted + * @throws Exception + */ + public IndexEntry addMessage(long id, Location location) throws Exception; + + /** + * This could be a duplicate operation.. Make sure that the location is not + * in the list already and then add. + * + * @param location + */ + public IndexEntry redoAddMessage(long id, Location location) throws Exception; + + /** + */ + public void removeUnreferencedRecords(IndexEntry until) throws Exception; + + /** + * Can the store auto remove messages that have no references and that are + * not loaded. + * + * @param enable + */ + public void setAutoRemove(boolean enable); + + public boolean contains(long id) throws Exception; + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndexManager.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndexManager.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndexManager.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndexManager.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.index.api; + +import java.util.Set; + +import org.apache.activemq.Service; +import org.apache.activemq.kaha.impl.async.Location; + +/** + * This if the root object of a persistence System in ActiveMQ. It has a life + * cycle and allows you to create DataIndex objects for messaging destinations. + * The DataIndexs hold messages in a sequenced list. + * + * It also provides the interface storing the state of the transaction operating + * against the broker. + * + * @author chirino + * + */ +public interface DataIndexManager extends IndexManager, Service { + + /** + * Flush write buffer and return once all data has landed on disk. + * + */ + public void sync() throws Exception; + + /** + * Removes all data from the indexes. + */ + public void clear(); + + /** + * + * @return + * @throws Exception + */ + public Location getLastAddLocation() throws Exception; + + public Set getDataFileIdsInUse() throws Exception; + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/Index.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/Index.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/Index.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/Index.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.index.api; + +import java.util.List; +import java.util.Map; + +/** + * @author chirino + */ +public interface Index { + + /** + * The name of the index. + * + * @return + */ + public String getName(); + + public void setProperties(Map properties) throws Exception; + + public Map getProperties() throws Exception; + + /** + * The number of records in the index. + * + * @return + * @throws Exception + */ + public long size() throws Exception; + + public IndexEntry getLastAddedId() throws Exception; + + public boolean remove(long id) throws Exception; + + public void redoRemove(long id) throws Exception; + + public List load(IndexEntry first, IndexEntry last, int max) throws Exception; + +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexEntry.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexEntry.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexEntry.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexEntry.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.index.api; + +import org.apache.activemq.kaha.impl.async.Location; + +/** + * A cache entry is used to reference count the number of user interested in a + * message. A data store's cache will keep entries that in use in memory and + * move those that are not in use out of memory. + * + * @author chirino + */ +public interface IndexEntry { + + /** + * The key to the message in the DataIndex. + * + * @return + */ + public long getId(); + + /** + * Gives you the location of the message in the journal. + * + * @return + */ + public Location getLocation(); + + public DataIndex getIndex(); +} Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexManager.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexManager.java?rev=633639&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexManager.java (added) +++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexManager.java Tue Mar 4 13:01:41 2008 @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.index.api; + +import java.util.List; + +/** + * The StoreManager allows you to add/find/delete Stores in the system. + * + * @author chirino + */ +public interface IndexManager { + + public T addStore(String name) throws Exception; + + public List getStores() throws Exception; + + /** + * Gets a previously created store. + * + * @param name + * @return null if the store does not exist. + * @throws Exception + */ + public T getStore(String name) throws Exception; + + public void removeStore(T store) throws Exception; + +}