activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject [activemq] branch master updated: AMQ-7228 - Avoid unnecessary lock contention when getting pending metrics
Date Fri, 14 Jun 2019 14:47:49 GMT
This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new dc56fa3  AMQ-7228 - Avoid unnecessary lock contention when getting pending metrics
dc56fa3 is described below

commit dc56fa3f6ea753b692b4b3a9ffacc4f82de6af74
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
AuthorDate: Fri Jun 14 10:46:21 2019 -0400

    AMQ-7228 - Avoid unnecessary lock contention when getting pending
    metrics
---
 .../broker/region/PrefetchSubscription.java        |  4 +--
 .../activemq/broker/region/TopicSubscription.java  | 42 ++++++++--------------
 2 files changed, 15 insertions(+), 31 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index a265570..a3b9a4d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -532,9 +532,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
 
     @Override
     public long getPendingMessageSize() {
-        synchronized (pendingLock) {
-            return pending.messageSize();
-        }
+        return pending.messageSize();
     }
 
     @Override
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index bf3f97b..0bf1c4e 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -16,14 +16,6 @@
  */
 package org.apache.activemq.broker.region;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.JMSException;
-
 import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
@@ -32,15 +24,7 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
-import org.apache.activemq.command.ConsumerControl;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.Response;
+import org.apache.activemq.command.*;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.transport.TransmitCallback;
@@ -48,6 +32,14 @@ import org.apache.activemq.usage.SystemUsage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.JMSException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class TopicSubscription extends AbstractSubscription {
 
     private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
@@ -61,7 +53,7 @@ public class TopicSubscription extends AbstractSubscription {
 
     private int maximumPendingMessages = -1;
     private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
-    private int discarded;
+    private final AtomicInteger discarded = new AtomicInteger();
     private final Object matchedListMutex = new Object();
     private int memoryUsageHighWaterMark = 95;
     // allow duplicate suppression in a ring network of brokers
@@ -448,9 +440,7 @@ public class TopicSubscription extends AbstractSubscription {
 
     @Override
     public long getPendingMessageSize() {
-        synchronized (matchedListMutex) {
-            return matched.messageSize();
-        }
+        return matched.messageSize();
     }
 
     @Override
@@ -482,9 +472,7 @@ public class TopicSubscription extends AbstractSubscription {
      * @return the number of messages discarded due to being a slow consumer
      */
     public int discarded() {
-        synchronized (matchedListMutex) {
-            return discarded;
-        }
+        return discarded.get();
     }
 
     /**
@@ -493,9 +481,7 @@ public class TopicSubscription extends AbstractSubscription {
      *         prefetch buffer being full).
      */
     public int matched() {
-        synchronized (matchedListMutex) {
-            return matched.size();
-        }
+        return matched.size();
     }
 
     /**
@@ -727,7 +713,7 @@ public class TopicSubscription extends AbstractSubscription {
         try {
             message.decrementReferenceCount();
             matched.remove(message);
-            discarded++;
+            discarded.incrementAndGet();
             if (destination != null) {
                 destination.getDestinationStatistics().getDequeues().increment();
             }


Mime
View raw message