activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5668
Date Wed, 17 Jun 2015 23:10:38 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 829e6fb0e -> 1e8621fed


https://issues.apache.org/jira/browse/AMQ-5668

This commit fixes a race condition in AbstractStoreCursor setLastCacheId that could
cause a null pointer exception in certain cases.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b40dc4cc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b40dc4cc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b40dc4cc

Branch: refs/heads/master
Commit: b40dc4cc5452455dd93f53abf71e92bb6cd0b9cc
Parents: 829e6fb
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Wed Jun 17 13:23:08 2015 +0000
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Wed Jun 17 19:09:53 2015 -0400

----------------------------------------------------------------------
 .../region/cursors/AbstractStoreCursor.java     | 66 ++++++++------
 .../cursors/AbstractStoreCursorNpeTest.java     | 93 ++++++++++++++++++++
 2 files changed, 130 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b40dc4cc/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 07d4351..4bdd7f6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -58,15 +58,15 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
             this.batchList = new OrderedPendingList();
         }
     }
-    
-    
+
+
     public final synchronized void start() throws Exception{
         if (!isStarted()) {
             super.start();
             resetBatch();
             resetSize();
             setCacheEnabled(size==0&&useCache);
-        } 
+        }
     }
 
     protected void resetSize() {
@@ -84,7 +84,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
         gc();
     }
 
-    
+
     public final boolean recoverMessage(Message message) throws Exception {
         return recoverMessage(message,false);
     }
@@ -148,12 +148,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
         clearIterator(true);
         size();
     }
-    
-    
+
+
     public synchronized void release() {
         clearIterator(false);
     }
-    
+
     private synchronized void clearIterator(boolean ensureIterator) {
         boolean haveIterator = this.iterator != null;
         this.iterator=null;
@@ -161,7 +161,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
             ensureIterator();
         }
     }
-    
+
     private synchronized void ensureIterator() {
         if(this.iterator==null) {
             this.iterator=this.batchList.iterator();
@@ -171,8 +171,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
 
     public final void finished() {
     }
-        
-    
+
+
     public final synchronized boolean hasNext() {
         if (batchList.isEmpty()) {
             try {
@@ -185,8 +185,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
         ensureIterator();
         return this.iterator.hasNext();
     }
-    
-    
+
+
     public final synchronized MessageReference next() {
         MessageReference result = null;
         if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
@@ -198,7 +198,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
         }
         return result;
     }
-    
+
     public synchronized boolean addMessageLast(MessageReference node) throws Exception {
         boolean disableCache = false;
         if (hasSpace()) {
@@ -314,23 +314,31 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
     }
 
     private void setLastCachedId(final int index, MessageId candidate) {
-        if (lastCachedIds[index] == null || lastCachedIds[index].getFutureOrSequenceLong()
== null) {  // possibly null for topics
-            lastCachedIds[index] = candidate;
-        } else if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), ((Long) lastCachedIds[index].getFutureOrSequenceLong()))
> 0) {
+        MessageId lastCacheId = lastCachedIds[index];
+        if (lastCacheId == null) {
             lastCachedIds[index] = candidate;
+        } else {
+            Object lastCacheFutureOrSequenceLong = lastCacheId.getFutureOrSequenceLong();
+            Object candidateOrSequenceLong = candidate.getFutureOrSequenceLong();
+            if (lastCacheFutureOrSequenceLong == null) { // possibly null for topics
+                lastCachedIds[index] = candidate;
+            } else if (candidateOrSequenceLong != null &&
+                    Long.compare(((Long) candidateOrSequenceLong), ((Long) lastCacheFutureOrSequenceLong))
> 0) {
+                lastCachedIds[index] = candidate;
+            }
         }
     }
 
     protected void setBatch(MessageId messageId) throws Exception {
     }
 
-    
+
     public synchronized void addMessageFirst(MessageReference node) throws Exception {
         setCacheEnabled(false);
         size++;
     }
 
-    
+
     public final synchronized void remove() {
         size--;
         if (iterator!=null) {
@@ -341,20 +349,20 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
         }
     }
 
-    
+
     public final synchronized void remove(MessageReference node) {
         if (batchList.remove(node) != null) {
             size--;
             setCacheEnabled(false);
         }
     }
-    
-    
+
+
     public final synchronized void clear() {
         gc();
     }
-    
-    
+
+
     public synchronized void gc() {
         for (MessageReference msg : batchList) {
             rollback(msg.getMessageId());
@@ -385,14 +393,14 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
             }
         }
     }
-    
-    
+
+
     public final synchronized boolean isEmpty() {
         // negative means more messages added to store through queue.send since last reset
         return size == 0;
     }
 
-    
+
     public final synchronized boolean hasMessagesBufferedToDeliver() {
         return !batchList.isEmpty();
     }
@@ -413,13 +421,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
                     + ",lastSyncCachedId:" + lastCachedIds[SYNC_ADD] + ",lastSyncCachedId-seq:"
+ (lastCachedIds[SYNC_ADD] != null ? lastCachedIds[SYNC_ADD].getFutureOrSequenceLong() : "null")
                     + ",lastAsyncCachedId:" + lastCachedIds[ASYNC_ADD] + ",lastAsyncCachedId-seq:"
+ (lastCachedIds[ASYNC_ADD] != null ? lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong() :
"null");
     }
-    
+
     protected abstract void doFillBatch() throws Exception;
-    
+
     protected abstract void resetBatch();
 
     protected abstract int getStoreSize();
-    
+
     protected abstract boolean isStoreEmpty();
 
     public Subscription getSubscription() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/b40dc4cc/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursorNpeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursorNpeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursorNpeTest.java
new file mode 100755
index 0000000..4d47165
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursorNpeTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.test.TestSupport;
+
+/**
+ * This test shows that a null pointer exception will not occur when unsubscribing from a
+ * subscription while a producer is sending messages rapidly to the topic.  A null pointer
+ * exception was occurring in the setLastCachedId method of AbstractMessageCursor due to
+ * a race condition.  If this test is run before the patch that is applied in this commit
+ * on AbstractStoreCusor, it will consistently fail with a NPE.
+ *
+ */
+public class AbstractStoreCursorNpeTest extends TestSupport {
+
+    protected Connection connection;
+    protected Session session;
+    protected MessageConsumer consumer;
+    protected MessageProducer producer;
+    protected Topic destination;
+
+
+    public void testSetLastCachedIdNPE() throws Exception {
+        connection = createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = session.createTopic("test.topic");
+        producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+
+        Connection durableCon = createConnection();
+        durableCon.setClientID("testCons");
+        durableCon.start();
+        final Session durSession = durableCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        //In a new thread rapidly subscribe and unsubscribe to a durable
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                try{
+                    //Repeatedly create a durable subscription and then unsubscribe which
used to
+                    //cause a NPE while messages were sending
+                    while(true) {
+                        MessageConsumer cons = durSession.createDurableSubscriber(durSession.createTopic("test.topic"),
"sub1");
+                        Thread.sleep(100);
+                        cons.close();
+                        durSession.unsubscribe("sub1");
+                    }
+                } catch (Exception ignored) {
+                    ignored.printStackTrace();
+                }
+            }
+        });
+
+        TextMessage myMessage = new ActiveMQTextMessage();
+        myMessage.setText("test");
+        //Make sure that we can send a bunch of messages without a NPE
+        //This would fail if the patch is not applied
+        for (int i = 0; i < 10000; i++) {
+            producer.send(myMessage);
+        }
+    }
+}


Mime
View raw message