activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: AMQ-6642: Fix potential NPE on updateMessage
Date Fri, 31 Mar 2017 13:09:33 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x c4f8361d5 -> bd083623d


AMQ-6642: Fix potential NPE on updateMessage

Fixing potential NPE when calling updateMessage on a Queue store in
KahaDB if subscription statistics are enabled.  Also reduced the
visibily from public to protected to subscription statistic related
methods that shouldn't be public and added null pointer checks there as
well.

https://issues.apache.org/jira/browse/AMQ-6642
(cherry picked from commit 2731f04f1ca81312d08ffc21c0ceb09513165b7c)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bd083623
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bd083623
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bd083623

Branch: refs/heads/activemq-5.14.x
Commit: bd083623dfbe98d025c31afa8471ad680922ba65
Parents: c4f8361
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Fri Mar 31 09:06:01 2017 -0400
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Fri Mar 31 09:09:17 2017 -0400

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  | 49 +++++++++++---------
 .../store/kahadb/MessageDatabaseSizeTest.java   | 41 ++++++++++++++++
 2 files changed, 68 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/bd083623/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 4fdaa01..34509b1 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1515,7 +1515,7 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
                 decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
 
                 //update all the subscription metrics
-                if (enableSubscriptionStatistics && location.getSize() != previousKeys.location.getSize())
{
+                if (enableSubscriptionStatistics && sd.ackPositions != null &&
location.getSize() != previousKeys.location.getSize()) {
                     Iterator<Entry<String, SequenceSet>> iter = sd.ackPositions.iterator(tx);
                     while (iter.hasNext()) {
                         Entry<String, SequenceSet> e = iter.next();
@@ -2961,33 +2961,38 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
         return sd.subscriptionAcks.get(tx, subscriptionKey);
     }
 
-    public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey)
throws IOException {
-        SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
-        if (messageSequences != null) {
-            long result = messageSequences.rangeSize();
-            // if there's anything in the range the last value is always the nextMessage
marker, so remove 1.
-            return result > 0 ? result - 1 : 0;
+    protected long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey)
throws IOException {
+        if (sd.ackPositions != null) {
+            SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
+            if (messageSequences != null) {
+                long result = messageSequences.rangeSize();
+                // if there's anything in the range the last value is always the nextMessage
marker, so remove 1.
+                return result > 0 ? result - 1 : 0;
+            }
         }
 
         return 0;
     }
 
-    public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey)
throws IOException {
-        //grab the messages attached to this subscription
-        SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
-
+    protected long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey)
throws IOException {
         long locationSize = 0;
-        if (messageSequences != null) {
-            Sequence head = messageSequences.getHead();
-            if (head != null) {
-                //get an iterator over the order index starting at the first unacked message
-                //and go over each message to add up the size
-                Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
-                        new MessageOrderCursor(head.getFirst()));
-
-                while (iterator.hasNext()) {
-                    Entry<Long, MessageKeys> entry = iterator.next();
-                    locationSize += entry.getValue().location.getSize();
+
+        if (sd.ackPositions != null) {
+            //grab the messages attached to this subscription
+            SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
+
+            if (messageSequences != null) {
+                Sequence head = messageSequences.getHead();
+                if (head != null) {
+                    //get an iterator over the order index starting at the first unacked
message
+                    //and go over each message to add up the size
+                    Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
+                            new MessageOrderCursor(head.getFirst()));
+
+                    while (iterator.hasNext()) {
+                        Entry<Long, MessageKeys> entry = iterator.next();
+                        locationSize += entry.getValue().location.getSize();
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/bd083623/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
index 357dc5f..4deb1e0 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
@@ -17,9 +17,12 @@
 package org.apache.activemq.store.kahadb;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -36,13 +39,27 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@RunWith(Parameterized.class)
 public class MessageDatabaseSizeTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(MessageDatabaseSizeTest.class);
 
+    @Parameters(name = "subStatsEnabled={0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                // Subscription stats on
+                {true},
+                // Subscription stats off
+                {false}
+        });
+    }
+
     @Rule
     public TemporaryFolder dataDir = new TemporaryFolder(new File("target"));
     private final String payload = new String(new byte[1024]);
@@ -50,6 +67,13 @@ public class MessageDatabaseSizeTest {
     private BrokerService broker = null;
     private final ActiveMQQueue destination = new ActiveMQQueue("Test");
     private KahaDBPersistenceAdapter adapter;
+    private boolean subStatsEnabled;
+
+    public MessageDatabaseSizeTest(boolean subStatsEnabled) {
+        super();
+        this.subStatsEnabled = subStatsEnabled;
+    }
+
 
     protected void startBroker() throws Exception {
         broker = new BrokerService();
@@ -58,6 +82,7 @@ public class MessageDatabaseSizeTest {
         broker.setUseJmx(true);
         broker.setDataDirectory(dataDir.getRoot().getAbsolutePath());
         adapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+        adapter.setEnableSubscriptionStatistics(subStatsEnabled);
         broker.start();
         LOG.info("Starting broker..");
     }
@@ -101,6 +126,22 @@ public class MessageDatabaseSizeTest {
         assertEquals(existingSize, messageStore.getMessageSize());
     }
 
+    @Test
+    public void testUpdateMessageSameLocationDifferentSize() throws Exception {
+        final KahaDBStore store = adapter.getStore();
+        MessageId messageId = new MessageId("111:222:333");
+        ActiveMQTextMessage textMessage = getMessage(new MessageId("111:222:333"));
+
+        //Add a single message and update once so we can compare the size consistently
+        MessageStore messageStore = store.createQueueMessageStore(destination);
+        messageStore.addMessage(broker.getAdminConnectionContext(), textMessage);
+        textMessage.setText("new size of message");
+        messageStore.updateMessage(textMessage);
+
+        assertNotNull(findMessageLocation(messageId.toString(), store.convert(destination)));
+
+    }
+
     /**
      * Test that when updating an existing message to a different location in the
      * journal that the index size doesn't change


Mime
View raw message