activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [2/3] activemq-6 git commit: ACTIVEMQ6-54 Depaging is not kicking in on some scenarios, and Browsing is not looking towards paging
Date Mon, 08 Dec 2014 11:29:26 GMT
ACTIVEMQ6-54 Depaging is not kicking in on some scenarios, and Browsing is not looking towards
paging

https://issues.apache.org/jira/browse/ACTIVEMQ6-54

This is fixing a few issues around paging:
- Browsing it not looking towards Paging. I'm using the queue.totalIterator which is a read-only
iterator that goes towards the pages messages.
- Depage is not kicking correctly in some scenarios. I have improved the logic on scheduling
depage for that.


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/933d90a4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/933d90a4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/933d90a4

Branch: refs/heads/master
Commit: 933d90a4f383c9dd0e66f0a3712bcb2f7561765c
Parents: aec50cf
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Fri Dec 5 12:25:07 2014 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Fri Dec 5 14:48:14 2014 -0500

----------------------------------------------------------------------
 .../activemq/core/server/impl/QueueImpl.java    |  26 ++++-
 .../core/server/impl/ServerConsumerImpl.java    |   2 +-
 .../tests/integration/client/PagingTest.java    | 116 +++++++++++++++++++
 3 files changed, 141 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/933d90a4/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
index 6703c91..9523205 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
@@ -664,6 +664,8 @@ public class QueueImpl implements Queue
             // no-op
             scheduledRunners.decrementAndGet();
          }
+
+         checkDepage();
       }
 
    }
@@ -2188,12 +2190,32 @@ public class QueueImpl implements Queue
          }
       }
 
-      if (pageIterator != null && messageReferences.size() == 0 && pageSubscription.isPaging()
&& pageIterator.hasNext() && !depagePending)
+      checkDepage();
+   }
+
+   private void checkDepage()
+   {
+      if (pageIterator != null && pageSubscription.isPaging() && !depagePending
&& needsDepage() && pageIterator.hasNext())
       {
          scheduleDepage(false);
       }
    }
 
+
+   /**
+    * This is a common check we do before scheduling depaging.. or while depaging.
+    * Before scheduling a depage runnable we verify if it fits / needs depaging.
+    * We also check for while needsDepage While depaging.
+    * This is just to avoid a copy & paste dependency
+    * @return
+    */
+   private boolean needsDepage()
+   {
+      return queueMemorySize.get() < pageSubscription.getPagingStore().getMaxSize();
+   }
+
+
+
    private SimpleString extractGroupID(MessageReference ref)
    {
       if (internalQueue)
@@ -2268,7 +2290,7 @@ public class QueueImpl implements Queue
       this.directDeliver = false;
 
       int depaged = 0;
-      while (timeout > System.currentTimeMillis() && queueMemorySize.get() <
maxSize && pageIterator.hasNext())
+      while (timeout > System.currentTimeMillis() && needsDepage() &&
pageIterator.hasNext())
       {
          depaged++;
          PagedReference reference = pageIterator.next();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/933d90a4/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java
b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java
index da8d094..aa12993 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java
@@ -213,7 +213,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
 
       if (browseOnly)
       {
-         browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
+         browserDeliverer = new BrowserDeliverer(messageQueue.totalIterator());
       }
       else
       {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/933d90a4/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
index 56369c8..ff333ab 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
@@ -5184,6 +5184,8 @@ public class PagingTest extends ServiceTestBase
     * When running this test from an IDE add this to the test command line so that the AssertionLoggerHandler
works properly:
     *
     *   -Djava.util.logging.manager=org.jboss.logmanager.LogManager  -Dlogging.configuration=file:<path_to_source>/tests/config/logging.properties
+    *
+    *   Note: Idea should get these from the pom and you shouldn't need to do this.
     */
    public void testFailMessagesNonDurable() throws Exception
    {
@@ -5860,6 +5862,120 @@ public class PagingTest extends ServiceTestBase
 
 
    @Test
+   public void testMultiFiltersBrowsing() throws Throwable
+   {
+      internalTestMultiFilters(true);
+   }
+
+   @Test
+   public void testMultiFiltersRegularConsumer() throws Throwable
+   {
+      internalTestMultiFilters(false);
+   }
+
+   public void internalTestMultiFilters(boolean browsing) throws Throwable
+   {
+      clearDataRecreateServerDirs();
+
+      Configuration config = createDefaultConfig()
+         .setJournalSyncNonTransactional(false);
+
+      server = createServer(true,
+                            config,
+                            PagingTest.PAGE_SIZE,
+                            PagingTest.PAGE_MAX,
+                            new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+         locator.setBlockOnDurableSend(true);
+         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSession session = sf.createSession(true, true, 0);
+
+         session.createQueue(ADDRESS.toString(), "Q1", null, true);
+
+         PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+
+         ClientProducer prod = session.createProducer(ADDRESS);
+
+         ClientMessage msg = null;
+         store.startPaging();
+
+         for (int i = 0; i < 100; i++)
+         {
+            msg = session.createMessage(true);
+            msg.putStringProperty("color", "red");
+            msg.putIntProperty("count", i);
+            prod.send(msg);
+
+            if (i > 0 && i % 10 == 0)
+            {
+               store.startPaging();
+               store.forceAnotherPage();
+            }
+         }
+
+         for (int i = 0; i < 100; i++)
+         {
+            msg = session.createMessage(true);
+            msg.putStringProperty("color", "green");
+            msg.putIntProperty("count", i);
+            prod.send(msg);
+
+            if (i > 0 && i % 10 == 0)
+            {
+               store.startPaging();
+               store.forceAnotherPage();
+            }
+         }
+
+         session.commit();
+
+         session.close();
+
+         session = sf.createSession(false, false, 0);
+         session.start();
+
+
+         ClientConsumer cons1;
+
+         if (browsing)
+         {
+            cons1 = session.createConsumer("Q1", "color='green'", true);
+         }
+         else
+         {
+            cons1 = session.createConsumer("Q1", "color='red'", false);
+         }
+
+         for (int i = 0; i < 100; i++)
+         {
+            msg = cons1.receive(5000);
+
+            System.out.println("Received " + msg);
+            assertNotNull(msg);
+            if (!browsing)
+            {
+               msg.acknowledge();
+            }
+         }
+
+         session.commit();
+
+         session.close();
+      }
+      finally
+      {
+         server.stop();
+      }
+
+   }
+
+
+   @Test
    public void testPendingACKOutOfOrder() throws Throwable
    {
       clearDataRecreateServerDirs();


Mime
View raw message