activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1419 OpenWire advisory message never deleted
Date Wed, 13 Sep 2017 22:21:48 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 17083d69e -> 02b6b8c8e


ARTEMIS-1419 OpenWire advisory message never deleted

By default, every openwire connection will create a queue
under the multicast address ActiveMQ.Advisory.TempQueue.
If a openwire client is create temporary queues these queues
will fill up with messages for as long as the associated
openwire connection is alive. It appears these messages
do not get consumed from the queues.

The reason behind is that advisory messages don't require
acknowledgement so the messages stay at the queue.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e4fb722a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e4fb722a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e4fb722a

Branch: refs/heads/master
Commit: e4fb722ad8480dd12c037230706ef97d9a57f801
Parents: 17083d6
Author: Howard Gao <howard.gao@gmail.com>
Authored: Wed Sep 13 20:50:56 2017 +0800
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Sep 13 18:21:43 2017 -0400

----------------------------------------------------------------------
 .../core/protocol/openwire/amq/AMQConsumer.java |  6 +++
 .../core/server/impl/ServerConsumerImpl.java    |  6 ++-
 .../openwire/SimpleOpenWireTest.java            | 42 ++++++++++++++++++++
 3 files changed, 53 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4fb722a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 61d2933..010a7aa 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.reader.MessageUtil;
@@ -82,10 +83,13 @@ public class AMQConsumer {
    public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId)
throws Exception {
 
       SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
+      boolean preAck = false;
       if (info.isNoLocal()) {
          if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) {
             //tell the connection to add the property
             this.session.getConnection().setNoLocal(true);
+         } else {
+            preAck = true;
          }
          String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'"
+ this.getId().getConnectionId() + "'";
          if (selector == null) {
@@ -110,6 +114,8 @@ public class AMQConsumer {
 
          serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null,
info.isBrowser(), false, -1);
          serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
+         //only advisory topic consumers need this.
+         ((ServerConsumerImpl)serverConsumer).setPreAcknowledge(preAck);
       } else {
          SimpleString queueName = new SimpleString(session.convertWildcard(openwireDestination.getPhysicalName()));
          try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4fb722a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index a685163..98015e1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -137,7 +137,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
 
    private final SessionCallback callback;
 
-   private final boolean preAcknowledge;
+   private boolean preAcknowledge;
 
    private final ManagementService managementService;
 
@@ -1139,6 +1139,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
       }
    };
 
+   public void setPreAcknowledge(boolean preAcknowledge) {
+      this.preAcknowledge = preAcknowledge;
+   }
+
    /**
     * Internal encapsulation of the logic on sending LargeMessages.
     * This Inner class was created to avoid a bunch of loose properties about the current
LargeMessage being sent

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4fb722a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
index f0fc8a6..dac67fc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
@@ -60,6 +60,7 @@ import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
@@ -1585,6 +1586,47 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
       assertNull(transaction);
    }
 
+   @Test
+   public void testTempQueueLeak() throws Exception {
+      final Connection[] connections = new Connection[20];
+
+      try {
+         for (int i = 0; i < connections.length; i++) {
+            connections[i] = factory.createConnection();
+            connections[i].start();
+         }
+
+         Session session = connections[0].createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < connections.length; i++) {
+            TemporaryQueue temporaryQueue = session.createTemporaryQueue();
+            temporaryQueue.delete();
+         }
+
+         Object[] addressResources = server.getManagementService().getResources(AddressControl.class);
+         AddressControl addressControl = null;
+
+         for (Object addressResource : addressResources) {
+
+            if (((AddressControl) addressResource).getAddress().equals("ActiveMQ.Advisory.TempQueue"))
{
+               addressControl = (AddressControl) addressResource;
+            }
+         }
+
+         assertNotNull("addressControl for temp advisory", addressControl);
+
+         //sleep a bit to allow message count to go down.
+         Thread.sleep(50);
+         assertEquals(0, addressControl.getMessageCount());
+      } finally {
+         for (Connection conn : connections) {
+            if (conn != null) {
+               conn.close();
+            }
+         }
+      }
+   }
+
    private void checkQueueEmpty(String qName) {
       PostOffice po = server.getPostOffice();
       LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName));


Mime
View raw message