activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r958009 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/region/Queue.java broker/region/Topic.java store/kahadb/KahaDBTransactionStore.java
Date Fri, 25 Jun 2010 15:45:47 GMT
Author: rajdavies
Date: Fri Jun 25 15:45:47 2010
New Revision: 958009

URL: http://svn.apache.org/viewvc?rev=958009&view=rev
Log:
transaction performance enhancements

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=958009&r1=958008&r2=958009&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Jun 25 15:45:47 2010
@@ -102,7 +102,7 @@ public class Queue extends BaseDestinati
     private MessageGroupMap messageGroupOwners;
     private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
-    private final Lock sendLock = new ReentrantLock();
+    final Lock sendLock = new ReentrantLock();
     private ExecutorService executor;
     protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections
             .synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
@@ -616,9 +616,6 @@ public class Queue extends BaseDestinati
                     @Override
                     public void beforeCommit() throws Exception {
                         sendLock.lockInterruptibly();
-                    }
-                    @Override
-                    public void afterCommit() throws Exception {
                         try {
                             // It could take while before we receive the commit
                             // op, by that time the message could have expired..

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=958009&r1=958008&r2=958009&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Fri Jun 25 15:45:47 2010
@@ -80,10 +80,11 @@ public class Topic extends BaseDestinati
         };
     };
 
-    public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore
store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception
{
+    public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore
store,
+            DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception
{
         super(brokerService, store, destination, parentStats);
         this.topicStore = store;
-        //set default subscription recovery policy
+        // set default subscription recovery policy
         subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
         this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
     }
@@ -92,7 +93,8 @@ public class Topic extends BaseDestinati
     public void initialize() throws Exception {
         super.initialize();
         if (store != null) {
-            // AMQ-2586: Better to leave this stat at zero than to give the user misleading
metrics.
+            // AMQ-2586: Better to leave this stat at zero than to give the user
+            // misleading metrics.
             // int messageCount = store.getMessageCount();
             // destinationStatistics.getMessages().setCount(messageCount);
         }
@@ -147,7 +149,8 @@ public class Topic extends BaseDestinati
         }
     }
 
-    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
throws Exception {
+    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
+            throws Exception {
         if (!sub.getConsumerInfo().isDurable()) {
             destinationStatistics.getConsumers().decrement();
             synchronized (consumers) {
@@ -166,7 +169,7 @@ public class Topic extends BaseDestinati
                 // deactivate and remove
                 removed.deactivate(false);
                 consumers.remove(removed);
-            }         
+            }
         }
     }
 
@@ -267,7 +270,8 @@ public class Topic extends BaseDestinati
         final ConnectionContext context = producerExchange.getConnectionContext();
 
         final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
-        final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize()
> 0 && !context.isInRecoveryMode();
+        final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize()
> 0
+                && !context.isInRecoveryMode();
 
         // There is delay between the client sending it and it arriving at the
         // destination.. it may have expired.
@@ -289,17 +293,24 @@ public class Topic extends BaseDestinati
 
                 if (warnOnProducerFlowControl) {
                     warnOnProducerFlowControl = false;
-                    LOG.info("Usage Manager memory limit ("+ memoryUsage.getLimit() + ")
reached for " + getActiveMQDestination().getQualifiedName()
-                            + ". Producers will be throttled to the rate at which messages
are removed from this destination to prevent flooding it."
-                            + " See http://activemq.apache.org/producer-flow-control.html
for more info");
+                    LOG
+                            .info("Usage Manager memory limit ("
+                                    + memoryUsage.getLimit()
+                                    + ") reached for "
+                                    + getActiveMQDestination().getQualifiedName()
+                                    + ". Producers will be throttled to the rate at which
messages are removed from this destination to prevent flooding it."
+                                    + " See http://activemq.apache.org/producer-flow-control.html
for more info");
                 }
 
                 if (systemUsage.isSendFailIfNoSpace()) {
-                    throw new javax.jms.ResourceAllocationException("Usage Manager memory
limit ("+ memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId()
+ ") to prevent flooding "
-                            + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html
for more info");
+                    throw new javax.jms.ResourceAllocationException("Usage Manager memory
limit ("
+                            + memoryUsage.getLimit() + ") reached. Stopping producer (" +
message.getProducerId()
+                            + ") to prevent flooding " + getActiveMQDestination().getQualifiedName()
+ "."
+                            + " See http://activemq.apache.org/producer-flow-control.html
for more info");
                 }
 
-                // We can avoid blocking due to low usage if the producer is sending
+                // We can avoid blocking due to low usage if the producer is
+                // sending
                 // a sync message or
                 // if it is using a producer window
                 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired())
{
@@ -319,7 +330,8 @@ public class Topic extends BaseDestinati
                                     }
 
                                     if (sendProducerAck) {
-                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(),
message.getSize());
+                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(),
message
+                                                .getSize());
                                         context.getConnection().dispatchAsync(ack);
                                     } else {
                                         Response response = new Response();
@@ -338,7 +350,8 @@ public class Topic extends BaseDestinati
                             }
                         });
 
-                        // If the user manager is not full, then the task will not
+                        // If the user manager is not full, then the task will
+                        // not
                         // get called..
                         if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask))
{
                             // so call it directly here.
@@ -349,10 +362,11 @@ public class Topic extends BaseDestinati
                     }
 
                 } else {
-                    // Producer flow control cannot be used, so we have do the flow
+                    // Producer flow control cannot be used, so we have do the
+                    // flow
                     // control at the broker
                     // by blocking this thread until there is space available.
-                    
+
                     if (memoryUsage.isFull()) {
                         if (context.isInTransaction()) {
 
@@ -364,12 +378,20 @@ public class Topic extends BaseDestinati
                                 if (count > 2 && context.isInTransaction()) {
                                     count = 0;
                                     int size = context.getTransaction().size();
-                                    LOG.warn("Waiting for space to send  transacted message
- transaction elements = " + size + " need more space to commit. Message = " + message);
+                                    LOG.warn("Waiting for space to send  transacted message
- transaction elements = "
+                                            + size + " need more space to commit. Message
= " + message);
                                 }
                             }
                         } else {
-                            waitForSpace(context, memoryUsage, "Usage Manager memory limit
reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
-                                    + getActiveMQDestination().getQualifiedName() + "." +
" See http://activemq.apache.org/producer-flow-control.html for more info");
+                            waitForSpace(
+                                    context,
+                                    memoryUsage,
+                                    "Usage Manager memory limit reached. Stopping producer
("
+                                            + message.getProducerId()
+                                            + ") to prevent flooding "
+                                            + getActiveMQDestination().getQualifiedName()
+                                            + "."
+                                            + " See http://activemq.apache.org/producer-flow-control.html
for more info");
                         }
                     }
 
@@ -403,7 +425,8 @@ public class Topic extends BaseDestinati
      * @throws IOException
      * @throws Exception
      */
-    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final
Message message) throws IOException, Exception {
+    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final
Message message)
+            throws IOException, Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
         message.setRegionDestination(this);
         message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
@@ -411,15 +434,17 @@ public class Topic extends BaseDestinati
 
         if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence())
{
             if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
-                final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark()
+ "% of " + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+                final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark()
+ "% of "
+                        + systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
+ message.getProducerId()
+                        + ") to prevent flooding " + getActiveMQDestination().getQualifiedName()
+ "."
                         + " See http://activemq.apache.org/producer-flow-control.html for
more info";
                 if (systemUsage.isSendFailIfNoSpace()) {
                     throw new javax.jms.ResourceAllocationException(logMessage);
                 }
 
                 waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(),
logMessage);
-            }      
-            result = topicStore.asyncAddTopicMessage(context, message);      
+            }
+            result = topicStore.asyncAddTopicMessage(context, message);
         }
 
         message.incrementReferenceCount();
@@ -427,7 +452,7 @@ public class Topic extends BaseDestinati
         if (context.isInTransaction()) {
             context.getTransaction().addSynchronization(new Synchronization() {
                 @Override
-                public void afterCommit() throws Exception {
+                public void beforeCommit() throws Exception {
                     // It could take while before we receive the commit
                     // operration.. by that time the message could have
                     // expired..
@@ -454,10 +479,10 @@ public class Topic extends BaseDestinati
         }
         if (result != null && !result.isCancelled()) {
             try {
-            result.get();
-            }catch(CancellationException e) {
-              //ignore - the task has been cancelled if the message
-              // has already been deleted
+                result.get();
+            } catch (CancellationException e) {
+                // ignore - the task has been cancelled if the message
+                // has already been deleted
             }
         }
 
@@ -472,7 +497,8 @@ public class Topic extends BaseDestinati
         return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions="
+ consumers.size();
     }
 
-    public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck
ack, final MessageReference node) throws IOException {
+    public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck
ack,
+            final MessageReference node) throws IOException {
         if (topicStore != null && node.isPersistent()) {
             DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
             SubscriptionKey key = dsub.getSubscriptionKey();
@@ -580,7 +606,8 @@ public class Topic extends BaseDestinati
     }
 
     protected void dispatch(final ConnectionContext context, Message message) throws Exception
{
-        // AMQ-2586: Better to leave this stat at zero than to give the user misleading metrics.
+        // AMQ-2586: Better to leave this stat at zero than to give the user
+        // misleading metrics.
         // destinationStatistics.getMessages().increment();
         destinationStatistics.getEnqueues().increment();
         dispatchValve.increment();
@@ -612,7 +639,8 @@ public class Topic extends BaseDestinati
 
     public void messageExpired(ConnectionContext context, Subscription subs, MessageReference
reference) {
         broker.messageExpired(context, reference);
-        // AMQ-2586: Better to leave this stat at zero than to give the user misleading metrics.
+        // AMQ-2586: Better to leave this stat at zero than to give the user
+        // misleading metrics.
         // destinationStatistics.getMessages().decrement();
         destinationStatistics.getEnqueues().decrement();
         destinationStatistics.getExpired().increment();
@@ -626,11 +654,10 @@ public class Topic extends BaseDestinati
             LOG.error("Failed to remove expired Message from the store ", e);
         }
     }
-    
+
     @Override
     protected Log getLog() {
         return LOG;
     }
 
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=958009&r1=958008&r2=958009&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
Fri Jun 25 15:45:47 2010
@@ -244,7 +244,6 @@ public class KahaDBTransactionStore impl
                             doneSomething = true;
                         }
                     }
-                   
                     if (postCommit != null) {
                         postCommit.run();
                     }



Mime
View raw message