activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [1/2] activemq-6 git commit: ACTIVEMQ6-54 Fixing tests broken after Paging fix
Date Thu, 11 Dec 2014 18:29:20 GMT
Repository: activemq-6
Updated Branches:
  refs/heads/master 0eb6ebda2 -> 1491f4a12


ACTIVEMQ6-54 Fixing tests broken after Paging fix

https://issues.apache.org/jira/browse/ACTIVEMQ6-54

Changing the order of depaging introduced an extra check that needs to be checked now.
This will probably take care of the issue by checking if the page is complete before depage.


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/09490cdb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/09490cdb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/09490cdb

Branch: refs/heads/master
Commit: 09490cdba3278f53980a906f35893e7b4c57d7fd
Parents: 0eb6ebd
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Wed Dec 10 22:03:10 2014 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Dec 10 22:06:35 2014 -0500

----------------------------------------------------------------------
 .../cursor/impl/PageCursorProviderImpl.java     |  54 +++++----
 .../tests/integration/client/PagingTest.java    | 114 +++++++++++++++++++
 2 files changed, 146 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/09490cdb/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java
b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java
index 32f30e8..3df0b78 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -425,28 +425,7 @@ public class PageCursorProviderImpl implements PageCursorProvider
             // on that case we need to move to verify it in a different way
             if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages()
> 0)
             {
-               boolean complete = true;
-
-               for (PageSubscription cursor : cursorList)
-               {
-                  if (!cursor.isComplete(minPage))
-                  {
-                     if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
-                     {
-                        ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + " was considered
incomplete at page " + minPage);
-                     }
-
-                     complete = false;
-                     break;
-                  }
-                  else
-                  {
-                     if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
-                     {
-                        ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + "was considered
**complete** at page " + minPage);
-                     }
-                  }
-               }
+               boolean complete = checkPageCompletion(cursorList, minPage);
 
                if (!pagingStore.isStarted())
                {
@@ -475,6 +454,10 @@ public class PageCursorProviderImpl implements PageCursorProvider
 
             for (long i = pagingStore.getFirstPage(); i < minPage; i++)
             {
+               if (!checkPageCompletion(cursorList, i))
+               {
+                  break;
+               }
                Page page = pagingStore.depage();
                if (page == null)
                {
@@ -577,6 +560,33 @@ public class PageCursorProviderImpl implements PageCursorProvider
 
    }
 
+
+   private boolean checkPageCompletion(ArrayList<PageSubscription> cursorList, long
minPage)
+   {
+      boolean complete = true;
+
+      for (PageSubscription cursor : cursorList)
+      {
+         if (!cursor.isComplete(minPage))
+         {
+            if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
+            {
+               ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + " was considered incomplete
at page " + minPage);
+            }
+
+            complete = false;
+            break;
+         }
+         else
+         {
+            if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
+            {
+               ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + "was considered **complete**
at page " + minPage);
+            }
+         }
+      }
+      return complete;
+   }
    /**
     * @return
     */

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/09490cdb/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
index ff333ab..0e4751d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
@@ -113,6 +113,120 @@ public class PagingTest extends ServiceTestBase
    }
 
    @Test
+   public void testPageOnLargeMessageMultipleQueues() throws Exception
+   {
+      Configuration config = createDefaultConfig();
+
+      final int PAGE_MAX = 20 * 1024;
+
+      final int PAGE_SIZE = 10 * 1024;
+
+      HashMap<String, AddressSettings> map = new HashMap<String, AddressSettings>();
+
+      AddressSettings value = new AddressSettings();
+      map.put(ADDRESS.toString(), value);
+      ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+      server.start();
+
+      final int numberOfBytes = 1024;
+
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(true);
+
+      ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
+
+      ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+      session.createQueue(ADDRESS, ADDRESS.concat("-0"), null, true);
+      session.createQueue(ADDRESS, ADDRESS.concat("-1"), null, true);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      ClientMessage message = null;
+
+      for (int i = 0; i < 201; i++)
+      {
+         message = session.createMessage(true);
+
+         message.getBodyBuffer().writerIndex(0);
+
+         message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
+
+         for (int j = 1; j <= numberOfBytes; j++)
+         {
+            message.getBodyBuffer().writeInt(j);
+         }
+
+         producer.send(message);
+      }
+
+
+      session.close();
+
+      server.stop();
+
+      server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+      server.start();
+
+      sf = createSessionFactory(locator);
+
+      for (int ad = 0; ad < 2; ad++)
+      {
+         session = sf.createSession(false, false, false);
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS.concat("-" + ad));
+
+         session.start();
+
+         for (int i = 0; i < 201; i++)
+         {
+            ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+
+            Assert.assertNotNull(message2);
+
+            message2.acknowledge();
+
+            Assert.assertNotNull(message2);
+         }
+
+         try
+         {
+            if (ad > -1)
+            {
+               session.commit();
+            }
+            else
+            {
+               session.rollback();
+               for (int i = 0; i < 100; i++)
+               {
+                  ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+
+                  Assert.assertNotNull(message2);
+
+                  message2.acknowledge();
+
+                  Assert.assertNotNull(message2);
+               }
+               session.commit();
+
+            }
+         }
+         catch (Throwable e)
+         {
+            System.err.println("here!!!!!!!");
+            e.printStackTrace();
+            System.exit(-1);
+         }
+
+         consumer.close();
+
+         session.close();
+      }
+   }
+
+   @Test
    public void testPageCleanup() throws Exception
    {
       clearDataRecreateServerDirs();


Mime
View raw message