activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [1/2] activemq-6 git commit: ACTIVEMQ6-64 Messages duplicated during ScaleDown or queue.totalIterator
Date Tue, 06 Jan 2015 16:58:11 GMT
Repository: activemq-6
Updated Branches:
  refs/heads/master 887743f09 -> c9de5c763


ACTIVEMQ6-64 Messages duplicated during ScaleDown or queue.totalIterator

https://issues.apache.org/jira/browse/ACTIVEMQ6-64

The redelivery list was not isolated on the PageIterator. This is moving the
redelivery list to the Iterator so we would have proper isolation of the functionality.

The previous version was assuming a single instance of PageIterator, QueueImpl and PageSubscription.
When we started using more than one instance of the Iterator we created this bug.


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/e0b0b6bf
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/e0b0b6bf
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/e0b0b6bf

Branch: refs/heads/master
Commit: e0b0b6bf8930cf06c017ab0125b3acb33d586eea
Parents: 887743f
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Mon Jan 5 16:39:42 2015 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Jan 5 20:34:05 2015 -0500

----------------------------------------------------------------------
 .../core/paging/PageTransactionInfo.java        |   5 +-
 .../core/paging/cursor/PageIterator.java        |  29 ++++
 .../core/paging/cursor/PageSubscription.java    |   4 +-
 .../cursor/impl/PageSubscriptionImpl.java       |  29 ++--
 .../paging/impl/PageTransactionInfoImpl.java    |  55 +++++--
 .../integration/paging/PagingSendTest.java      | 158 ++++++++++++++++++-
 6 files changed, 252 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/e0b0b6bf/activemq-server/src/main/java/org/apache/activemq/core/paging/PageTransactionInfo.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/PageTransactionInfo.java
b/activemq-server/src/main/java/org/apache/activemq/core/paging/PageTransactionInfo.java
index 02b6d9b..39833c6 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/paging/PageTransactionInfo.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/PageTransactionInfo.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.core.paging;
 
 import org.apache.activemq.core.journal.EncodingSupport;
+import org.apache.activemq.core.paging.cursor.PageIterator;
 import org.apache.activemq.core.paging.cursor.PagePosition;
 import org.apache.activemq.core.paging.cursor.PageSubscription;
 import org.apache.activemq.core.persistence.StorageManager;
@@ -61,10 +62,8 @@ public interface PageTransactionInfo extends EncodingSupport
    /**
     * This method will hold the position to be delivered later in case this transaction is
pending.
     * If the tx is not pending, it will return false, so the caller can deliver it right
away
-    * @param cursor
-    * @param cursorPos
     * @return true if the message will be delivered later, false if it should be delivered
right away
     */
-   boolean deliverAfterCommit(PageSubscription cursor, PagePosition cursorPos);
+   boolean deliverAfterCommit(PageIterator pageIterator, PageSubscription cursor, PagePosition
cursorPos);
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/e0b0b6bf/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageIterator.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageIterator.java
b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageIterator.java
new file mode 100644
index 0000000..a5a64c9
--- /dev/null
+++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageIterator.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.core.paging.cursor;
+
+import org.apache.activemq.utils.LinkedListIterator;
+
+/**
+ * @author clebertsuconic
+ */
+
+public interface PageIterator extends LinkedListIterator<PagedReference>
+{
+   void redeliver(PagePosition reference);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/e0b0b6bf/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageSubscription.java
b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageSubscription.java
index af6a67d..5423658 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageSubscription.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageSubscription.java
@@ -125,12 +125,12 @@ public interface PageSubscription
     * To be used on redeliveries
     * @param position
     */
-   void redeliver(PagePosition position);
+   void redeliver(PageIterator iterator, PagePosition position);
 
    void printDebug();
 
    /**
-    * @param minPage
+    * @param page
     * @return
     */
    boolean isComplete(long page);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/e0b0b6bf/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageSubscriptionImpl.java
b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageSubscriptionImpl.java
index 39263a7..6482795 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -39,6 +39,7 @@ import org.apache.activemq.core.paging.PagedMessage;
 import org.apache.activemq.core.paging.PagingStore;
 import org.apache.activemq.core.paging.cursor.PageCache;
 import org.apache.activemq.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.core.paging.cursor.PageIterator;
 import org.apache.activemq.core.paging.cursor.PagePosition;
 import org.apache.activemq.core.paging.cursor.PageSubscription;
 import org.apache.activemq.core.paging.cursor.PageSubscriptionCounter;
@@ -55,7 +56,6 @@ import org.apache.activemq.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.utils.ConcurrentHashSet;
 import org.apache.activemq.utils.FutureLatch;
-import org.apache.activemq.utils.LinkedListIterator;
 
 /**
  * A PageCursorImpl
@@ -99,9 +99,6 @@ final class PageSubscriptionImpl implements PageSubscription
 
    private final AtomicLong deliveredCount = new AtomicLong(0);
 
-   // We only store the position for redeliveries. They will be read from the SoftCache again
during delivery.
-   private final java.util.Queue<PagePosition> redeliveries = new LinkedList<PagePosition>();
-
    PageSubscriptionImpl(final PageCursorProvider cursorProvider,
                         final PagingStore pageStore,
                         final StorageManager store,
@@ -378,7 +375,7 @@ final class PageSubscriptionImpl implements PageSubscription
    }
 
    @Override
-   public LinkedListIterator<PagedReference> iterator()
+   public PageIterator iterator()
    {
       return new CursorIterator();
    }
@@ -583,12 +580,9 @@ final class PageSubscriptionImpl implements PageSubscription
    }
 
    @Override
-   public void redeliver(final PagePosition position)
+   public void redeliver(final PageIterator iterator, final PagePosition position)
    {
-      synchronized (redeliveries)
-      {
-         redeliveries.add(position);
-      }
+      iterator.redeliver(position);
 
       synchronized (consumedPages)
       {
@@ -1245,7 +1239,7 @@ final class PageSubscriptionImpl implements PageSubscription
 
    }
 
-   private class CursorIterator implements LinkedListIterator<PagedReference>
+   private class CursorIterator implements PageIterator
    {
       private PagePosition position = null;
 
@@ -1255,6 +1249,9 @@ final class PageSubscriptionImpl implements PageSubscription
 
       private volatile PagedReference lastRedelivery = null;
 
+      // We only store the position for redeliveries. They will be read from the SoftCache
again during delivery.
+      private final java.util.Queue<PagePosition> redeliveries = new LinkedList<PagePosition>();
+
       /**
        * next element taken on hasNext test.
        * it has to be delivered on next next operation
@@ -1265,6 +1262,14 @@ final class PageSubscriptionImpl implements PageSubscription
       {
       }
 
+      public void redeliver(PagePosition reference)
+      {
+         synchronized (redeliveries)
+         {
+            redeliveries.add(reference);
+         }
+      }
+
       public void repeat()
       {
          if (isredelivery)
@@ -1390,7 +1395,7 @@ final class PageSubscriptionImpl implements PageSubscription
                   }
                   else
                   {
-                     if (tx.deliverAfterCommit(PageSubscriptionImpl.this, message.getPosition()))
+                     if (tx.deliverAfterCommit(CursorIterator.this, PageSubscriptionImpl.this,
message.getPosition()))
                      {
                         valid = false;
                         ignored = false;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/e0b0b6bf/activemq-server/src/main/java/org/apache/activemq/core/paging/impl/PageTransactionInfoImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/impl/PageTransactionInfoImpl.java
b/activemq-server/src/main/java/org/apache/activemq/core/paging/impl/PageTransactionInfoImpl.java
index 0a9f815..80bdd86 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/paging/impl/PageTransactionInfoImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/impl/PageTransactionInfoImpl.java
@@ -23,9 +23,9 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.api.core.ActiveMQBuffer;
-import org.apache.activemq.api.core.Pair;
 import org.apache.activemq.core.paging.PageTransactionInfo;
 import org.apache.activemq.core.paging.PagingManager;
+import org.apache.activemq.core.paging.cursor.PageIterator;
 import org.apache.activemq.core.paging.cursor.PagePosition;
 import org.apache.activemq.core.paging.cursor.PageSubscription;
 import org.apache.activemq.core.persistence.StorageManager;
@@ -58,7 +58,7 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
 
    private final AtomicInteger numberOfPersistentMessages = new AtomicInteger(0);
 
-   private List<Pair<PageSubscription, PagePosition>> lateDeliveries;
+   private List<LateDelivery> lateDeliveries;
 
    // Static --------------------------------------------------------
 
@@ -146,9 +146,9 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
       if (lateDeliveries != null)
       {
          // This is to make sure deliveries that were touched before the commit arrived will
be delivered
-         for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
+         for (LateDelivery pos : lateDeliveries)
          {
-            pos.getA().redeliver(pos.getB());
+            pos.getSubscription().redeliver(pos.getIterator(), pos.getPagePosition());
          }
          lateDeliveries.clear();
       }
@@ -225,9 +225,9 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
 
       if (lateDeliveries != null)
       {
-         for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
+         for (LateDelivery pos : lateDeliveries)
          {
-            pos.getA().lateDeliveryRollback(pos.getB());
+            pos.getSubscription().lateDeliveryRollback(pos.getPagePosition());
          }
          lateDeliveries = null;
       }
@@ -245,12 +245,12 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
    }
 
    @Override
-   public synchronized boolean deliverAfterCommit(PageSubscription cursor, PagePosition cursorPos)
+   public synchronized boolean deliverAfterCommit(PageIterator iterator, PageSubscription
cursor, PagePosition cursorPos)
    {
       if (committed && useRedelivery)
       {
          cursor.addPendingDelivery(cursorPos);
-         cursor.redeliver(cursorPos);
+         cursor.redeliver(iterator, cursorPos);
          return true;
       }
       else if (committed)
@@ -267,10 +267,10 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
          useRedelivery = true;
          if (lateDeliveries == null)
          {
-            lateDeliveries = new LinkedList<Pair<PageSubscription, PagePosition>>();
+            lateDeliveries = new LinkedList<>();
          }
          cursor.addPendingDelivery(cursorPos);
-         lateDeliveries.add(new Pair<PageSubscription, PagePosition>(cursor, cursorPos));
+         lateDeliveries.add(new LateDelivery(cursor, cursorPos, iterator));
          return true;
       }
    }
@@ -283,6 +283,41 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
 
    // Inner classes -------------------------------------------------
 
+   /** a Message shouldn't be delivered until it's committed
+    *  For that reason the page-refernce will be written right away
+    *  But in certain cases we can only deliver after the commit
+    *  For that reason we will perform a late delivery
+    *  through the method redeliver.
+    */
+   private static class LateDelivery
+   {
+      final PageSubscription subscription;
+      final PagePosition pagePosition;
+      final PageIterator iterator;
+
+      public LateDelivery(PageSubscription subscription, PagePosition pagePosition, PageIterator
iterator)
+      {
+         this.subscription = subscription;
+         this.pagePosition = pagePosition;
+         this.iterator = iterator;
+      }
+
+      public PageSubscription getSubscription()
+      {
+         return subscription;
+      }
+
+      public PagePosition getPagePosition()
+      {
+         return pagePosition;
+      }
+
+      public PageIterator getIterator()
+      {
+         return iterator;
+      }
+   }
+
 
    private static class UpdatePageTXOperation extends TransactionOperationAbstract
    {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/e0b0b6bf/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/paging/PagingSendTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/paging/PagingSendTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/paging/PagingSendTest.java
index b35c5ea..5e6d9eb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/paging/PagingSendTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/paging/PagingSendTest.java
@@ -16,10 +16,17 @@
  */
 package org.apache.activemq.tests.integration.paging;
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.api.core.ActiveMQException;
+import org.apache.activemq.api.core.Message;
 import org.apache.activemq.api.core.SimpleString;
 import org.apache.activemq.api.core.client.ClientConsumer;
 import org.apache.activemq.api.core.client.ClientMessage;
@@ -27,9 +34,14 @@ import org.apache.activemq.api.core.client.ClientProducer;
 import org.apache.activemq.api.core.client.ClientSession;
 import org.apache.activemq.api.core.client.ClientSessionFactory;
 import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.core.config.Configuration;
+import org.apache.activemq.core.config.impl.ConfigurationImpl;
 import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.core.server.MessageReference;
+import org.apache.activemq.core.server.Queue;
 import org.apache.activemq.core.settings.impl.AddressSettings;
 import org.apache.activemq.tests.util.ServiceTestBase;
+import org.apache.activemq.utils.LinkedListIterator;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -57,7 +69,9 @@ public class PagingSendTest extends ServiceTestBase
    public void setUp() throws Exception
    {
       super.setUp();
+      Configuration config = new ConfigurationImpl();
       server = newActiveMQServer();
+
       server.start();
       waitForServer(server);
       locator = createFactory(isNetty());
@@ -224,4 +238,146 @@ public class PagingSendTest extends ServiceTestBase
 
       assertEquals(0, errors.get());
    }
-}
\ No newline at end of file
+
+   @Test
+   public void testPagingDoesNotDuplicateBatchMessages() throws Exception
+   {
+      int batchSize = 20;
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession session = sf.createSession(false, false);
+
+      // Create a queue
+      SimpleString queueAddr = new SimpleString("testQueue");
+      session.createQueue(queueAddr, queueAddr, null, true);
+
+      // Set up paging on the queue address
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setPageSizeBytes(10 * 1024);
+      /** This actually causes the address to start paging messages after 10 x messages with
1024 payload is sent.
+       Presumably due to additional meta-data, message headers etc... **/
+      addressSettings.setMaxSizeBytes(16 * 1024);
+      server.getAddressSettingsRepository().addMatch("#", addressSettings);
+
+      sendMessageBatch(batchSize, session, queueAddr);
+
+      Queue queue = server.locateQueue(queueAddr);
+
+      checkBatchMessagesAreNotPagedTwice(queue);
+
+      for (int i = 0; i < 10; i++)
+      {
+         // execute the same count a couple times. This is to make sure the iterators have
no impact regardless
+         // the number of times they are called
+         assertEquals(batchSize, processCountThroughIterator(queue));
+      }
+
+   }
+
+   @Test
+   public void testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws Exception
+   {
+      int batchSize = 20;
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession session = sf.createSession(false, false);
+
+      // Create a queue
+      SimpleString queueAddr = new SimpleString("testQueue");
+      session.createQueue(queueAddr, queueAddr, null, true);
+
+      // Set up paging on the queue address
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setPageSizeBytes(10 * 1024);
+      /** This actually causes the address to start paging messages after 10 x messages with
1024 payload is sent.
+       Presumably due to additional meta-data, message headers etc... **/
+      addressSettings.setMaxSizeBytes(16 * 1024);
+      server.getAddressSettingsRepository().addMatch("#", addressSettings);
+
+      int numberOfMessages = 0;
+      // ensure the server is paging
+      while (!server.getPagingManager().getPageStore(queueAddr).isPaging())
+      {
+         sendMessageBatch(batchSize, session, queueAddr);
+         numberOfMessages += batchSize;
+
+      }
+
+      sendMessageBatch(batchSize, session, queueAddr);
+      numberOfMessages += batchSize;
+
+      Queue queue = server.locateQueue(queueAddr);
+      checkBatchMessagesAreNotPagedTwice(queue);
+
+      for (int i = 0; i < 10; i++)
+      {
+         // execute the same count a couple times. This is to make sure the iterators have
no impact regardless
+         // the number of times they are called
+         assertEquals(numberOfMessages, processCountThroughIterator(queue));
+      }
+   }
+
+   public List<String> sendMessageBatch(int batchSize, ClientSession session, SimpleString
queueAddr) throws ActiveMQException
+   {
+      List<String> messageIds = new ArrayList<String>();
+      ClientProducer producer = session.createProducer(queueAddr);
+      for (int i = 0; i < batchSize; i++)
+      {
+         Message message = session.createMessage(true);
+         message.getBodyBuffer().writeBytes(new byte[1024]);
+         String id = UUID.randomUUID().toString();
+         message.putStringProperty("id", id);
+         message.putIntProperty("seq", i); // this is to make the print-data easier to debug
+         messageIds.add(id);
+         producer.send(message);
+      }
+      session.commit();
+
+      return messageIds;
+   }
+
+   /**
+    * checks that there are no message duplicates in the page.  Any IDs found in the ignoreIds
field will not be tested
+    * this allows us to test only those messages that have been sent after the address has
started paging (ignoring any
+    * duplicates that may have happened before this point).
+    */
+   public void checkBatchMessagesAreNotPagedTwice(Queue queue) throws Exception
+   {
+      LinkedListIterator<MessageReference> pageIterator = queue.totalIterator();
+
+      Set<String> messageOrderSet = new HashSet<String>();
+
+      int duplicates = 0;
+      while (pageIterator.hasNext())
+      {
+         MessageReference reference = pageIterator.next();
+
+         String id = reference.getMessage().getStringProperty("id");
+
+         // If add(id) returns true it means that this id was already added to this set.
 Hence a duplicate is found.
+         if (!messageOrderSet.add(id))
+         {
+            duplicates++;
+         }
+      }
+      assertTrue(duplicates == 0);
+   }
+
+   /**
+    * checks that there are no message duplicates in the page.  Any IDs found in the ignoreIds
field will not be tested
+    * this allows us to test only those messages that have been sent after the address has
started paging (ignoring any
+    * duplicates that may have happened before this point).
+    */
+   protected int processCountThroughIterator(Queue queue) throws Exception
+   {
+      LinkedListIterator<MessageReference> pageIterator = queue.totalIterator();
+
+      int count = 0;
+      while (pageIterator.hasNext())
+      {
+         MessageReference reference = pageIterator.next();
+         count++;
+      }
+      return count;
+   }
+}


Mime
View raw message