Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 89370 invoked from network); 25 Jun 2010 15:46:41 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 25 Jun 2010 15:46:41 -0000 Received: (qmail 16780 invoked by uid 500); 25 Jun 2010 15:46:41 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 16717 invoked by uid 500); 25 Jun 2010 15:46:40 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 16710 invoked by uid 99); 25 Jun 2010 15:46:40 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Jun 2010 15:46:40 +0000 X-ASF-Spam-Status: No, hits=-1900.1 required=10.0 tests=ALL_TRUSTED,AWL X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Jun 2010 15:46:39 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id BBF5A23889D2; Fri, 25 Jun 2010 15:45:47 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r958009 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/region/Queue.java broker/region/Topic.java store/kahadb/KahaDBTransactionStore.java Date: Fri, 25 Jun 2010 15:45:47 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100625154547.BBF5A23889D2@eris.apache.org> Author: rajdavies Date: Fri Jun 25 15:45:47 2010 New Revision: 958009 URL: http://svn.apache.org/viewvc?rev=958009&view=rev Log: transaction performance enhancements Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=958009&r1=958008&r2=958009&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Jun 25 15:45:47 2010 @@ -102,7 +102,7 @@ public class Queue extends BaseDestinati private MessageGroupMap messageGroupOwners; private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); - private final Lock sendLock = new ReentrantLock(); + final Lock sendLock = new ReentrantLock(); private ExecutorService executor; protected final Map messagesWaitingForSpace = Collections .synchronizedMap(new LinkedHashMap()); @@ -616,9 +616,6 @@ public class Queue extends BaseDestinati @Override public void beforeCommit() throws Exception { sendLock.lockInterruptibly(); - } - @Override - public void afterCommit() throws Exception { try { // It could take while before we receive the commit // op, by that time the message could have expired.. Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=958009&r1=958008&r2=958009&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Jun 25 15:45:47 2010 @@ -80,10 +80,11 @@ public class Topic extends BaseDestinati }; }; - public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { + public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, + DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { super(brokerService, store, destination, parentStats); this.topicStore = store; - //set default subscription recovery policy + // set default subscription recovery policy subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy(); this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); } @@ -92,7 +93,8 @@ public class Topic extends BaseDestinati public void initialize() throws Exception { super.initialize(); if (store != null) { - // AMQ-2586: Better to leave this stat at zero than to give the user misleading metrics. + // AMQ-2586: Better to leave this stat at zero than to give the user + // misleading metrics. // int messageCount = store.getMessageCount(); // destinationStatistics.getMessages().setCount(messageCount); } @@ -147,7 +149,8 @@ public class Topic extends BaseDestinati } } - public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { + public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) + throws Exception { if (!sub.getConsumerInfo().isDurable()) { destinationStatistics.getConsumers().decrement(); synchronized (consumers) { @@ -166,7 +169,7 @@ public class Topic extends BaseDestinati // deactivate and remove removed.deactivate(false); consumers.remove(removed); - } + } } } @@ -267,7 +270,8 @@ public class Topic extends BaseDestinati final ConnectionContext context = producerExchange.getConnectionContext(); final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); - final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode(); + final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 + && !context.isInRecoveryMode(); // There is delay between the client sending it and it arriving at the // destination.. it may have expired. @@ -289,17 +293,24 @@ public class Topic extends BaseDestinati if (warnOnProducerFlowControl) { warnOnProducerFlowControl = false; - LOG.info("Usage Manager memory limit ("+ memoryUsage.getLimit() + ") reached for " + getActiveMQDestination().getQualifiedName() - + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." - + " See http://activemq.apache.org/producer-flow-control.html for more info"); + LOG + .info("Usage Manager memory limit (" + + memoryUsage.getLimit() + + ") reached for " + + getActiveMQDestination().getQualifiedName() + + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." + + " See http://activemq.apache.org/producer-flow-control.html for more info"); } if (systemUsage.isSendFailIfNoSpace()) { - throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("+ memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " - + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); + throw new javax.jms.ResourceAllocationException("Usage Manager memory limit (" + + memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId() + + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + + " See http://activemq.apache.org/producer-flow-control.html for more info"); } - // We can avoid blocking due to low usage if the producer is sending + // We can avoid blocking due to low usage if the producer is + // sending // a sync message or // if it is using a producer window if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { @@ -319,7 +330,8 @@ public class Topic extends BaseDestinati } if (sendProducerAck) { - ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); + ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message + .getSize()); context.getConnection().dispatchAsync(ack); } else { Response response = new Response(); @@ -338,7 +350,8 @@ public class Topic extends BaseDestinati } }); - // If the user manager is not full, then the task will not + // If the user manager is not full, then the task will + // not // get called.. if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { // so call it directly here. @@ -349,10 +362,11 @@ public class Topic extends BaseDestinati } } else { - // Producer flow control cannot be used, so we have do the flow + // Producer flow control cannot be used, so we have do the + // flow // control at the broker // by blocking this thread until there is space available. - + if (memoryUsage.isFull()) { if (context.isInTransaction()) { @@ -364,12 +378,20 @@ public class Topic extends BaseDestinati if (count > 2 && context.isInTransaction()) { count = 0; int size = context.getTransaction().size(); - LOG.warn("Waiting for space to send transacted message - transaction elements = " + size + " need more space to commit. Message = " + message); + LOG.warn("Waiting for space to send transacted message - transaction elements = " + + size + " need more space to commit. Message = " + message); } } } else { - waitForSpace(context, memoryUsage, "Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " - + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); + waitForSpace( + context, + memoryUsage, + "Usage Manager memory limit reached. Stopping producer (" + + message.getProducerId() + + ") to prevent flooding " + + getActiveMQDestination().getQualifiedName() + + "." + + " See http://activemq.apache.org/producer-flow-control.html for more info"); } } @@ -403,7 +425,8 @@ public class Topic extends BaseDestinati * @throws IOException * @throws Exception */ - synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { + synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) + throws IOException, Exception { final ConnectionContext context = producerExchange.getConnectionContext(); message.setRegionDestination(this); message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); @@ -411,15 +434,17 @@ public class Topic extends BaseDestinati if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { - final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of " + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of " + + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() + + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"; if (systemUsage.isSendFailIfNoSpace()) { throw new javax.jms.ResourceAllocationException(logMessage); } waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); - } - result = topicStore.asyncAddTopicMessage(context, message); + } + result = topicStore.asyncAddTopicMessage(context, message); } message.incrementReferenceCount(); @@ -427,7 +452,7 @@ public class Topic extends BaseDestinati if (context.isInTransaction()) { context.getTransaction().addSynchronization(new Synchronization() { @Override - public void afterCommit() throws Exception { + public void beforeCommit() throws Exception { // It could take while before we receive the commit // operration.. by that time the message could have // expired.. @@ -454,10 +479,10 @@ public class Topic extends BaseDestinati } if (result != null && !result.isCancelled()) { try { - result.get(); - }catch(CancellationException e) { - //ignore - the task has been cancelled if the message - // has already been deleted + result.get(); + } catch (CancellationException e) { + // ignore - the task has been cancelled if the message + // has already been deleted } } @@ -472,7 +497,8 @@ public class Topic extends BaseDestinati return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); } - public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException { + public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, + final MessageReference node) throws IOException { if (topicStore != null && node.isPersistent()) { DurableTopicSubscription dsub = (DurableTopicSubscription) sub; SubscriptionKey key = dsub.getSubscriptionKey(); @@ -580,7 +606,8 @@ public class Topic extends BaseDestinati } protected void dispatch(final ConnectionContext context, Message message) throws Exception { - // AMQ-2586: Better to leave this stat at zero than to give the user misleading metrics. + // AMQ-2586: Better to leave this stat at zero than to give the user + // misleading metrics. // destinationStatistics.getMessages().increment(); destinationStatistics.getEnqueues().increment(); dispatchValve.increment(); @@ -612,7 +639,8 @@ public class Topic extends BaseDestinati public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { broker.messageExpired(context, reference); - // AMQ-2586: Better to leave this stat at zero than to give the user misleading metrics. + // AMQ-2586: Better to leave this stat at zero than to give the user + // misleading metrics. // destinationStatistics.getMessages().decrement(); destinationStatistics.getEnqueues().decrement(); destinationStatistics.getExpired().increment(); @@ -626,11 +654,10 @@ public class Topic extends BaseDestinati LOG.error("Failed to remove expired Message from the store ", e); } } - + @Override protected Log getLog() { return LOG; } - } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=958009&r1=958008&r2=958009&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java Fri Jun 25 15:45:47 2010 @@ -244,7 +244,6 @@ public class KahaDBTransactionStore impl doneSomething = true; } } - if (postCommit != null) { postCommit.run(); }