activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch master updated: ARTEMIS-3054 Fix inconsistencies between replica catchup and page cleanup
Date Fri, 08 Jan 2021 19:17:10 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new b4d66b6  ARTEMIS-3054 Fix inconsistencies between replica catchup and page cleanup
     new c90d7df  This closes #3400
b4d66b6 is described below

commit b4d66b684a44669615b86d971268f925ae0ee60c
Author: Clebert Suconic <clebertsuconic@apache.org>
AuthorDate: Wed Jan 6 21:25:46 2021 -0500

    ARTEMIS-3054 Fix inconsistencies between replica catchup and page cleanup
---
 .../paging/cursor/impl/PageCursorProviderImpl.java |  20 ++-
 .../paging/cursor/impl/PageSubscriptionImpl.java   |   4 +
 .../artemis/core/paging/impl/PagingStoreImpl.java  |  10 ++
 .../impl/journal/JournalStorageManager.java        |   6 +
 .../client/JMSPagingFileDeleteTest.java            |   6 +-
 .../PageCleanupWhileReplicaCatchupTest.java        | 174 +++++++++++++++++++++
 .../tests/integration/paging/PagingTest.java       |  11 +-
 7 files changed, 210 insertions(+), 21 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 0a8168d..7ace24b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -428,6 +428,16 @@ public class PageCursorProviderImpl implements PageCursorProvider {
 
       ArrayList<Page> depagedPages = new ArrayList<>();
 
+      // This read lock is required
+      // because in case of a replicated configuration
+      // The replication manager will first get a writeLock on the StorageManager
+      // for a short period when it is getting a list of IDs to send to the replica
+      // Not getting this lock now could eventually result in a dead lock for a different
order
+      //
+      // I tried to simplify the locks but each PageStore has its own lock, so this was the
best option
+      // I found in order to fix https://issues.apache.org/jira/browse/ARTEMIS-3054
+      storageManager.readLock();
+
       while (true) {
          if (pagingStore.lock(100)) {
             break;
@@ -471,7 +481,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
                }
             }
 
-            for (long i = pagingStore.getFirstPage(); i < minPage; i++) {
+            for (long i = pagingStore.getFirstPage(); i <= minPage; i++) {
                if (!checkPageCompletion(cursorList, i)) {
                   break;
                }
@@ -495,9 +505,11 @@ public class PageCursorProviderImpl implements PageCursorProvider {
             }
          } catch (Exception ex) {
             ActiveMQServerLogger.LOGGER.problemCleaningPageAddress(ex, pagingStore.getAddress());
+            logger.warn(ex.getMessage(), ex);
             return;
          } finally {
             pagingStore.unlock();
+            storageManager.readUnLock();
          }
       }
       finishCleanup(depagedPages);
@@ -625,12 +637,6 @@ public class PageCursorProviderImpl implements PageCursorProvider {
          for (PageSubscription cursor : cursorList) {
             cursor.confirmPosition(new PagePositionImpl(currentPage.getPageId(), -1));
          }
-
-         // we just need to make sure the storage is done..
-         // if the thread pool is full, we will just log it once instead of looping
-         if (!storageManager.waitOnOperations(5000)) {
-            ActiveMQServerLogger.LOGGER.problemCompletingOperations(storageManager.getContext());
-         }
       } finally {
          for (PageSubscription cursor : cursorList) {
             cursor.enableAutoCleanup();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 7e1bd2c..67a5cf4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -1182,6 +1182,10 @@ public final class PageSubscriptionImpl implements PageSubscription
{
          PageCache localCache = this.cache.get();
          if (localCache == null) {
             localCache = cursorProvider.getPageCache(pageId);
+            // this could happen if the file does not exist any more, after cleanup
+            if (localCache == null) {
+               return 0;
+            }
             this.cache = new WeakReference<>(localCache);
          }
          int numberOfMessage = localCache.getNumberOfMessages();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 31c969d..3c248af 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -400,6 +400,16 @@ public class PagingStoreImpl implements PagingStore {
       }
    }
 
+   public int getNumberOfFiles() throws Exception {
+      final SequentialFileFactory fileFactory = this.fileFactory;
+      if (fileFactory != null) {
+         List<String> files = fileFactory.listFiles("page");
+         return files.size();
+      }
+
+      return 0;
+   }
+
    @Override
    public void start() throws Exception {
       lock.writeLock().lock();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 3fa73b7..59dcc63 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -637,6 +637,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
       try {
          Map<SimpleString, Collection<Integer>> pageFilesToSync;
          storageManagerLock.writeLock().lock();
+
+         // We need to get this lock here in order to
+         // avoid a clash with Page.cleanup();
+         // This was a fix part of https://issues.apache.org/jira/browse/ARTEMIS-3054
+         pagingManager.lock();
          try {
             if (isReplicated())
                throw new ActiveMQIllegalStateException("already replicating");
@@ -680,6 +685,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
             replicator.sendLargeMessageIdListMessage(pendingLargeMessages);
          } finally {
             storageManagerLock.writeLock().unlock();
+            pagingManager.unlock();
          }
 
          sendJournalFile(messageFiles, JournalContent.MESSAGES);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSPagingFileDeleteTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSPagingFileDeleteTest.java
index ff23351..5e883a6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSPagingFileDeleteTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSPagingFileDeleteTest.java
@@ -196,11 +196,7 @@ public class JMSPagingFileDeleteTest extends JMSTestBase {
 
          timeout = System.currentTimeMillis() + 10000;
 
-         while (timeout > System.currentTimeMillis() && pagingStore.getNumberOfPages()
!= 1) {
-            Thread.sleep(100);
-         }
-
-         assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the page
is 1, but It was not.
+         Wait.assertEquals(0, pagingStore::getNumberOfPages); //I expected number of the
page is 1, but It was not.
       } finally {
          if (connection != null) {
             connection.close();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/PageCleanupWhileReplicaCatchupTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/PageCleanupWhileReplicaCatchupTest.java
new file mode 100644
index 0000000..1e977d6
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/PageCleanupWhileReplicaCatchupTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PageCleanupWhileReplicaCatchupTest extends FailoverTestBase {
+
+   private static final Logger logger = Logger.getLogger(PageCleanupWhileReplicaCatchupTest.class);
+   volatile boolean running = true;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      startBackupServer = false;
+      super.setUp();
+   }
+
+   @Override
+   protected void createConfigs() throws Exception {
+      createReplicatedConfigs();
+   }
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
+      return getNettyAcceptorTransportConfiguration(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
{
+      return getNettyConnectorTransportConfiguration(live);
+   }
+
+   @Override
+   protected ActiveMQServer createInVMFailoverServer(final boolean realFiles,
+                                                     final Configuration configuration,
+                                                     final NodeManager nodeManager,
+                                                     int id) {
+      Map<String, AddressSettings> conf = new HashMap<>();
+      AddressSettings as = new AddressSettings().setMaxSizeBytes(PAGE_MAX).setPageSizeBytes(PAGE_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      conf.put(ADDRESS.toString(), as);
+      return createInVMFailoverServer(realFiles, configuration, PAGE_SIZE, PAGE_MAX, conf,
nodeManager, id);
+   }
+
+   @Test(timeout = 120_000)
+   public void testPageCleanup() throws Throwable {
+      int numberOfWorkers = 20;
+
+      Worker[] workers = new Worker[numberOfWorkers];
+
+      for (int i = 0; i < 20; i++) {
+         liveServer.getServer().addAddressInfo(new AddressInfo("WORKER_" + i).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
+         liveServer.getServer().createQueue(new QueueConfiguration("WORKER_" + i).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+         workers[i] = new Worker("WORKER_" + i);
+         workers[i].start();
+      }
+
+      for (int i = 0; i < 25; i++) {
+         logger.debug("Starting replica " + i);
+         backupServer.start();
+         Wait.assertTrue(backupServer.getServer()::isReplicaSync);
+         backupServer.stop();
+      }
+
+      running = false;
+
+      for (Worker worker : workers) {
+         worker.join();
+      }
+
+      Throwable toThrow = null;
+      for (Worker worker : workers) {
+         if (worker.throwable != null) {
+            worker.queue.getPagingStore().getCursorProvider().scheduleCleanup();
+            Thread.sleep(2000);
+            worker.queue.getPagingStore().getCursorProvider().cleanup();
+
+            // This is more a debug statement in case there is an issue with the test
+            System.out.println("PagingStore(" + worker.queueName + ")::isPaging() = " + worker.queue.getPagingStore().isPaging()
+ " after test failure " + worker.throwable.getMessage());
+            toThrow = worker.throwable;
+         }
+      }
+
+      if (toThrow != null) {
+         throw toThrow;
+      }
+
+      for (Worker worker : workers) {
+         PagingStoreImpl storeImpl = (PagingStoreImpl)worker.queue.getPagingStore();
+         Assert.assertTrue("Store impl " + worker.queueName + " had more files than expected
on " + storeImpl.getFolder(), storeImpl.getNumberOfFiles() <= 1);
+      }
+   }
+
+   class Worker extends Thread {
+
+      final String queueName;
+      final Queue queue;
+      volatile Throwable throwable;
+
+      Worker(String queue) {
+         super("Worker on queue " + queue + " for test on PageCleanupWhileReplicaCatchupTest");
+         this.queueName = queue;
+         this.queue = liveServer.getServer().locateQueue(queueName);
+      }
+
+      @Override
+      public void run() {
+         try {
+            ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
+            try (Connection connection = factory.createConnection()) {
+               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               connection.start();
+               javax.jms.Queue jmsQueue = session.createQueue(queueName);
+               MessageConsumer consumer = session.createConsumer(jmsQueue);
+               MessageProducer producer = session.createProducer(jmsQueue);
+               while (running) {
+                  queue.getPagingStore().startPaging();
+                  for (int i = 0; i < 10; i++) {
+                     producer.send(session.createTextMessage("hello " + i));
+                  }
+                  Wait.assertTrue(queue.getPagingStore()::isPaging);
+                  for (int i = 0; i < 10; i++) {
+                     Assert.assertNotNull(consumer.receive(5000));
+                  }
+                  Wait.assertFalse("Waiting for !Paging on " + queueName + " with folder
" + queue.getPagingStore().getFolder(), queue.getPagingStore()::isPaging);
+               }
+            }
+         } catch (Throwable e) {
+            e.printStackTrace(System.out);
+            this.throwable = e;
+         }
+
+      }
+   }
+
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index ec57605..eaa6c61 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -490,13 +490,6 @@ public class PagingTest extends ActiveMQTestBase {
       waitForNotPaging(queue);
 
       server.stop();
-
-      HashMap<Integer, AtomicInteger> counts = countJournalLivingRecords(server.getConfiguration());
-
-      AtomicInteger pgComplete = counts.get((int) JournalRecordIds.PAGE_CURSOR_COMPLETE);
-
-      assertTrue(pgComplete == null || pgComplete.get() == 0);
-
    }
 
    @Test
@@ -4630,7 +4623,7 @@ public class PagingTest extends ActiveMQTestBase {
       // It's async, so need to wait a bit for it happening
       assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
 
-      Wait.assertEquals(1, ()->server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages());
+      Wait.assertEquals(0, ()->server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages());
    }
 
    @Test
@@ -6004,7 +5997,7 @@ public class PagingTest extends ActiveMQTestBase {
 
          locator.close();
 
-         Wait.assertEquals(2, store::getNumberOfPages);
+         Wait.assertEquals(0, store::getNumberOfPages);
 
       } finally {
          try {


Mime
View raw message