activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] 01/03: ARTEMIS-2200 NPE while dropping/failing large messages on paging
Date Thu, 17 Jan 2019 20:39:39 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 702f445205de953a343c1beef436fc8b984c7624
Author: Francesco Nigro <nigro.fra@gmail.com>
AuthorDate: Fri Dec 7 09:35:08 2018 +0100

    ARTEMIS-2200 NPE while dropping/failing large messages on paging
    
    Large messages pendingRecordID is not accessed atomically, leading
    to races that would lead to records that cannot been found on the
    journal for deletion: it would lead to cause NPE that won't clean
    the pending tasks on the current OperationContextImpl.
    Adding a cleanup on error of those tasks and avoiding the race
    to happen by adding proper synchronization will both enforce
    correct clean up when something bad happen and avoid NPE.
---
 .../artemis/core/journal/impl/JournalImpl.java     |  1 +
 .../journal/AbstractJournalStorageManager.java     | 14 +++--
 .../impl/journal/JournalStorageManager.java        | 20 ++++---
 .../core/postoffice/impl/PostOfficeImpl.java       | 14 +++--
 .../openwire/OpenWireLargeMessageTest.java         | 69 ++++++++++++++++++++++
 5 files changed, 97 insertions(+), 21 deletions(-)

diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index fe08ed8..c9d0dce 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -974,6 +974,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
             } catch (Throwable e) {
                result.fail(e);
                logger.error("appendDeleteRecord:" + e, e);
+               setErrorCondition(callback, null, e);
             } finally {
                journalLock.readLock().unlock();
             }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index b227c98..2790f32 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1654,12 +1654,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
    // Package protected ---------------------------------------------
 
    protected void confirmLargeMessage(final LargeServerMessage largeServerMessage) {
-      if (largeServerMessage.getPendingRecordID() >= 0) {
-         try {
-            confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
-            largeServerMessage.setPendingRecordID(LargeServerMessage.NO_PENDING_ID);
-         } catch (Exception e) {
-            ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+      synchronized (largeServerMessage) {
+         if (largeServerMessage.getPendingRecordID() >= 0) {
+            try {
+               confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
+               largeServerMessage.setPendingRecordID(LargeServerMessage.NO_PENDING_ID);
+            } catch (Exception e) {
+               ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+            }
          }
       }
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index fb1b3d7..e245740 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -454,15 +454,17 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
 
    // This should be accessed from this package only
    void deleteLargeMessageFile(final LargeServerMessage largeServerMessage) throws ActiveMQException
{
-      if (largeServerMessage.getPendingRecordID() < 0) {
-         try {
-            // The delete file happens asynchronously
-            // And the client won't be waiting for the actual file to be deleted.
-            // We set a temporary record (short lived) on the journal
-            // to avoid a situation where the server is restarted and pending large message
stays on forever
-            largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID(),
largeServerMessage.getPendingRecordID()));
-         } catch (Exception e) {
-            throw new ActiveMQInternalErrorException(e.getMessage(), e);
+      synchronized (largeServerMessage) {
+         if (largeServerMessage.getPendingRecordID() < 0) {
+            try {
+               // The delete file happens asynchronously
+               // And the client won't be waiting for the actual file to be deleted.
+               // We set a temporary record (short lived) on the journal
+               // to avoid a situation where the server is restarted and pending large message
stays on forever
+               largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID(),
largeServerMessage.getPendingRecordID()));
+            } catch (Exception e) {
+               throw new ActiveMQInternalErrorException(e.getMessage(), e);
+            }
          }
       }
       final SequentialFile file = largeServerMessage.getFile();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index bf12baf..7ea2ab6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1364,13 +1364,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener,
Binding
     */
    private void confirmLargeMessageSend(Transaction tx, final Message message) throws Exception
{
       LargeServerMessage largeServerMessage = (LargeServerMessage) message;
-      if (largeServerMessage.getPendingRecordID() >= 0) {
-         if (tx == null) {
-            storageManager.confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
-         } else {
-            storageManager.confirmPendingLargeMessageTX(tx, largeServerMessage.getMessageID(),
largeServerMessage.getPendingRecordID());
+      synchronized (largeServerMessage) {
+         if (largeServerMessage.getPendingRecordID() >= 0) {
+            if (tx == null) {
+               storageManager.confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
+            } else {
+               storageManager.confirmPendingLargeMessageTX(tx, largeServerMessage.getMessageID(),
largeServerMessage.getPendingRecordID());
+            }
+            largeServerMessage.setPendingRecordID(-1);
          }
-         largeServerMessage.setPendingRecordID(-1);
       }
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
index 99dcbdb..01c56b7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
@@ -23,9 +23,15 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -36,6 +42,7 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
    }
 
    public SimpleString lmAddress = new SimpleString("LargeMessageAddress");
+   public SimpleString lmDropAddress = new SimpleString("LargeMessageDropAddress");
 
    @Override
    @Before
@@ -43,6 +50,7 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
       this.realStore = true;
       super.setUp();
       server.createQueue(lmAddress, RoutingType.ANYCAST, lmAddress, null, true, false);
+      server.createQueue(lmDropAddress, RoutingType.ANYCAST, lmDropAddress, null, true, false);
    }
 
    @Test
@@ -62,6 +70,18 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
       }
    }
 
+   @Override
+   protected void configureAddressSettings(Map<String, AddressSettings> addressSettingsMap)
{
+      addressSettingsMap.put("#", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new
SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true));
+      addressSettingsMap.put(lmDropAddress.toString(),
+                             new AddressSettings()
+                                .setMaxSizeBytes(15 * 1024 * 1024)
+                                .setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP)
+                                .setMessageCounterHistoryDayLimit(10)
+                                .setRedeliveryDelay(0)
+                                .setMaxDeliveryAttempts(0));
+   }
+
    @Test
    public void testSendReceiveLargeMessage() throws Exception {
       // Create 1MB Message
@@ -103,4 +123,53 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
          assertArrayEquals(body, bytes);
       }
    }
+
+   @Test
+   public void testFastLargeMessageProducerDropOnPaging() throws Exception {
+      AssertionLoggerHandler.startCapture();
+      try {
+         // Create 100K Message
+         int size = 100 * 1024;
+
+         final byte[] bytes = new byte[size];
+
+         try (Connection connection = factory.createConnection()) {
+            connection.start();
+
+            try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE))
{
+               Queue queue = session.createQueue(lmDropAddress.toString());
+               try (MessageProducer producer = session.createProducer(queue)) {
+                  producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+                  bytes[0] = 1;
+
+                  BytesMessage message = session.createBytesMessage();
+                  message.writeBytes(bytes);
+
+                  final PagingStore pageStore = server.getPagingManager().getPageStore(lmDropAddress);
+                  while (!pageStore.isPaging()) {
+                     producer.send(message);
+                  }
+                  for (int i = 0; i < 10; i++) {
+                     producer.send(message);
+                  }
+                  final long messageCount = server.locateQueue(lmDropAddress).getMessageCount();
+                  Assert.assertTrue("The queue cannot be empty", messageCount > 0);
+                  try (MessageConsumer messageConsumer = session.createConsumer(queue)) {
+                     for (long m = 0; m < messageCount; m++) {
+                        if (messageConsumer.receive(2000) == null) {
+                           Assert.fail("The messages are not finished yet");
+                        }
+                     }
+                  }
+               }
+            }
+         }
+         server.stop();
+         Assert.assertFalse(AssertionLoggerHandler.findText("NullPointerException"));
+         Assert.assertFalse(AssertionLoggerHandler.findText("Cannot find record"));
+      } finally {
+         AssertionLoggerHandler.stopCapture();
+      }
+   }
 }


Mime
View raw message