activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: [AMQ-6891] test and fix non tx variant of this leak
Date Tue, 30 Jan 2018 10:51:59 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 80ef6d312 -> dd2572bcb


[AMQ-6891] test and fix non tx variant of this leak


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/dd2572bc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/dd2572bc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/dd2572bc

Branch: refs/heads/master
Commit: dd2572bcb1c3793a8a2fa19cc4fc88cc8481f96e
Parents: 80ef6d3
Author: gtully <gary.tully@gmail.com>
Authored: Tue Jan 30 10:51:18 2018 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Tue Jan 30 10:51:30 2018 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  7 ++--
 .../jdbc/JmsTransactionCommitFailureTest.java   | 42 ++++++++++++++++++--
 2 files changed, 43 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/dd2572bc/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index dccd296..04488a2 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -774,11 +774,11 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
         }
     }
 
-    public void rollbackPendingCursorAdditions(MessageContext messageContext) {
+    public void rollbackPendingCursorAdditions(MessageId messageId) {
         synchronized (indexOrderedCursorUpdates) {
             for (int i = indexOrderedCursorUpdates.size() - 1; i >= 0; i--) {
                 MessageContext mc = indexOrderedCursorUpdates.get(i);
-                if (mc.message.getMessageId().equals(messageContext.message.getMessageId()))
{
+                if (mc.message.getMessageId().equals(messageId)) {
                     indexOrderedCursorUpdates.remove(mc);
                     if (mc.onCompletion != null) {
                         mc.onCompletion.run();
@@ -854,7 +854,7 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
         @Override
         public void afterRollback() throws Exception {
             if (store != null && messageContext.message.isPersistent()) {
-                rollbackPendingCursorAdditions(messageContext);
+                rollbackPendingCursorAdditions(messageContext.message.getMessageId());
             }
             messageContext.message.decrementReferenceCount();
         }
@@ -888,6 +888,7 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
                     // before restarting normal broker operations
                     resetNeeded = true;
                     pendingSends.decrementAndGet();
+                    rollbackPendingCursorAdditions(message.getMessageId());
                     throw e;
                 }
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/dd2572bc/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JmsTransactionCommitFailureTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JmsTransactionCommitFailureTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JmsTransactionCommitFailureTest.java
index 3e423a8..db7d156 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JmsTransactionCommitFailureTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JmsTransactionCommitFailureTest.java
@@ -182,7 +182,7 @@ public class JmsTransactionCommitFailureTest {
         // Set failure flag on persistence adapter
         persistenceAdapter.setCommitFailureEnabled(true);
         try {
-            for (int i = 0; i < 1000; i++) {
+            for (int i = 0; i < 10; i++) {
                 try {
                     sendMessage(queueName, 2);
                 } catch (JMSException jmse) {
@@ -202,10 +202,44 @@ public class JmsTransactionCommitFailureTest {
         }
     }
 
+
+    @Test
+    public void testQueueMemoryLeakNoTx() throws Exception {
+        String queueName = "testMemoryLeak";
+
+        sendMessage(queueName, 1);
+
+        // Set failure flag on persistence adapter
+        persistenceAdapter.setCommitFailureEnabled(true);
+        try {
+            for (int i = 0; i < 10; i++) {
+                try {
+                    sendMessage(queueName, 2, false);
+                } catch (JMSException jmse) {
+                    // Expected
+                }
+            }
+        } finally {
+            persistenceAdapter.setCommitFailureEnabled(false);
+        }
+        Destination destination = broker.getDestination(new ActiveMQQueue(queueName));
+        if (destination instanceof org.apache.activemq.broker.region.Queue) {
+            org.apache.activemq.broker.region.Queue queue = (org.apache.activemq.broker.region.Queue)
destination;
+            Field listField = org.apache.activemq.broker.region.Queue.class.getDeclaredField("indexOrderedCursorUpdates");
+            listField.setAccessible(true);
+            List<?> list = (List<?>) listField.get(queue);
+            Assert.assertEquals(0, list.size());
+        }
+    }
+
     private void sendMessage(String queueName, int count) throws JMSException {
+        sendMessage(queueName, count, true);
+    }
+
+    private void sendMessage(String queueName, int count, boolean transacted) throws JMSException
{
         Connection con = connectionFactory.createConnection();
         try {
-            Session session = con.createSession(true, Session.SESSION_TRANSACTED);
+            Session session = con.createSession(transacted, transacted ? Session.SESSION_TRANSACTED
: Session.AUTO_ACKNOWLEDGE);
             try {
                 Queue destination = session.createQueue(queueName);
                 MessageProducer producer = session.createProducer(destination);
@@ -216,7 +250,9 @@ public class JmsTransactionCommitFailureTest {
                         message.setText("Message-" + messageCounter++);
                         producer.send(message);
                     }
-                    session.commit();
+                    if (transacted) {
+                        session.commit();
+                    }
                 } finally {
                     producer.close();
                 }


Mime
View raw message