activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [01/17] activemq-artemis git commit: ARTEMIS-812 The countDelta attribute showing negative values [Forced Update!]
Date Mon, 24 Oct 2016 13:35:07 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/ARTEMIS-780 60bbb3fbd -> 6d41d37e9 (forced update)


ARTEMIS-812 The countDelta attribute showing negative values


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9c16ba26
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9c16ba26
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9c16ba26

Branch: refs/heads/ARTEMIS-780
Commit: 9c16ba26becebd69547e6eb16d384cdd58d2cc45
Parents: e44c99d
Author: Howard Gao <howard.gao@gmail.com>
Authored: Fri Oct 21 17:13:41 2016 +0800
Committer: Howard Gao <howard.gao@gmail.com>
Committed: Fri Oct 21 17:13:41 2016 +0800

----------------------------------------------------------------------
 .../impl/ActiveMQServerControlImpl.java         |   5 +-
 .../paging/cursor/PageSubscriptionCounter.java  |   3 +
 .../impl/PageSubscriptionCounterImpl.java       |  14 +-
 .../core/paging/impl/PagingStoreImpl.java       |   1 -
 .../core/server/ActiveMQMessageBundle.java      |   4 +-
 .../core/server/ActiveMQServerLogger.java       |   5 +
 .../artemis/core/server/impl/QueueImpl.java     |  15 +-
 .../ClusteredMessageCounterTest.java            | 318 +++++++++++++++++++
 .../management/ActiveMQServerControlTest.java   |   9 +-
 9 files changed, 359 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c16ba26/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index b96fcbf..5918ec4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -867,7 +867,10 @@ public class ActiveMQServerControlImpl extends AbstractControl implements
Active
       clearIO();
       try {
          if (newPeriod < MessageCounterManagerImpl.MIN_SAMPLE_PERIOD) {
-            throw ActiveMQMessageBundle.BUNDLE.invalidMessageCounterPeriod(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD);
+            if (newPeriod <= 0) {
+               throw ActiveMQMessageBundle.BUNDLE.periodMustGreaterThanZero(newPeriod);
+            }
+            ActiveMQServerLogger.LOGGER.invalidMessageCounterPeriod(newPeriod);
          }
 
          if (messageCounterManager != null && newPeriod != messageCounterManager.getSamplePeriod())
{

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c16ba26/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
index 343f936..2e1d7b6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
@@ -21,6 +21,9 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
 
 public interface PageSubscriptionCounter {
 
+   //incremental counter of messages added
+   long getValueAdded();
+
    long getValue();
 
    void increment(Transaction tx, int add) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c16ba26/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
index 92f313b..e01098d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
@@ -61,6 +61,8 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter
{
 
    private final AtomicLong value = new AtomicLong(0);
 
+   private final AtomicLong added = new AtomicLong(0);
+
    private final AtomicLong pendingValue = new AtomicLong(0);
 
    private final LinkedList<Long> incrementRecords = new LinkedList<>();
@@ -93,6 +95,11 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter
{
    }
 
    @Override
+   public long getValueAdded() {
+      return added.get() + pendingValue.get();
+   }
+
+   @Override
    public long getValue() {
       return value.get() + pendingValue.get();
    }
@@ -205,6 +212,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter
{
          this.subscription.notEmpty();
       }
       this.value.set(value1);
+      this.added.set(value1);
       this.recordID = recordID1;
    }
 
@@ -243,6 +251,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter
{
 
             recordID = -1;
             value.set(0);
+            added.set(0);
             incrementRecords.clear();
          }
       } finally {
@@ -269,6 +278,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter
{
 
          for (Pair<Long, Integer> incElement : loadList) {
             value.addAndGet(incElement.getB());
+            added.addAndGet(incElement.getB());
             incrementRecords.add(incElement.getA());
          }
          loadList.clear();
@@ -279,7 +289,9 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter
{
    @Override
    public synchronized void addInc(long id, int variance) {
       value.addAndGet(variance);
-
+      if (variance > 0) {
+         added.addAndGet(variance);
+      }
       if (id >= 0) {
          incrementRecords.add(id);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c16ba26/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index f756edd..f05ace0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -876,7 +876,6 @@ public class PagingStoreImpl implements PagingStore {
       }
 
       for (org.apache.activemq.artemis.core.server.Queue q : nonDurableQueues) {
-         q.getPageSubscription().getCounter().increment(tx, 1);
          q.getPageSubscription().notEmpty();
          ids[i++] = q.getID();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c16ba26/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index c87bd11..f22873b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -194,8 +194,8 @@ public interface ActiveMQMessageBundle {
    @Message(id = 119046, value = "invalid value: {0} count must be greater than 0", format
= Message.Format.MESSAGE_FORMAT)
    IllegalArgumentException greaterThanZero(Integer count);
 
-   @Message(id = 119047, value = "Cannot set Message Counter Sample Period < {0}ms", format
= Message.Format.MESSAGE_FORMAT)
-   IllegalArgumentException invalidMessageCounterPeriod(Long period);
+   @Message(id = 119047, value = "invalid value: {0} sample period must be greater than 0",
format = Message.Format.MESSAGE_FORMAT)
+   IllegalArgumentException periodMustGreaterThanZero(Long newPeriod);
 
    @Message(id = 119048, value = "invalid new Priority value: {0}. It must be between 0 and
9 (both included)", format = Message.Format.MESSAGE_FORMAT)
    IllegalArgumentException invalidNewPriority(Integer period);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c16ba26/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index ae07a8f..24432a3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1516,4 +1516,9 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224069, value = "Change detected in broker configuration file, but reload
failed", format = Message.Format.MESSAGE_FORMAT)
    void configurationReloadFailed(@Cause Throwable t);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 224072, value = "Message Counter Sample Period too short: {0}", format =
Message.Format.MESSAGE_FORMAT)
+   void invalidMessageCounterPeriod(long value);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c16ba26/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index d515b3d..7c8ad0a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -518,7 +518,9 @@ public class QueueImpl implements Queue {
 
       directDeliver = false;
 
-      messagesAdded++;
+      if (!ref.isPaged()) {
+         messagesAdded++;
+      }
    }
 
    @Override
@@ -573,7 +575,9 @@ public class QueueImpl implements Queue {
    protected boolean scheduleIfPossible(MessageReference ref) {
       if (scheduledDeliveryHandler.checkAndSchedule(ref, true)) {
          synchronized (this) {
-            messagesAdded++;
+            if (!ref.isPaged()) {
+               messagesAdded++;
+            }
          }
 
          return true;
@@ -1165,7 +1169,7 @@ public class QueueImpl implements Queue {
    @Override
    public long getMessagesAdded() {
       if (pageSubscription != null) {
-         return messagesAdded + pageSubscription.getCounter().getValue() - pagedReferences.get();
+         return messagesAdded + pageSubscription.getCounter().getValueAdded();
       } else {
          return messagesAdded;
       }
@@ -1819,7 +1823,10 @@ public class QueueImpl implements Queue {
       while ((ref = intermediateMessageReferences.poll()) != null) {
          internalAddTail(ref);
 
-         messagesAdded++;
+         if (!ref.isPaged()) {
+            messagesAdded++;
+         }
+
          if (added++ > MAX_DELIVERIES_IN_LOOP) {
             // if we just keep polling from the intermediate we could starve in case there's
a sustained load
             deliverAsync();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c16ba26/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredMessageCounterTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredMessageCounterTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredMessageCounterTest.java
new file mode 100644
index 0000000..a8ad897
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredMessageCounterTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.distribution;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
+import org.apache.activemq.artemis.api.core.management.MessageCounterInfo;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ClusteredMessageCounterTest extends ClusterTestBase {
+   private AtomicInteger total = new AtomicInteger();
+   private AtomicBoolean stopFlag = new AtomicBoolean();
+   private Timer timer1 = new Timer();
+   private Timer timer2 = new Timer();
+   private int numMsg = 1000;
+   private List<MessageCounterInfo> results = new ArrayList<>();
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      setupServers();
+      setupClusters();
+      total.set(0);
+      stopFlag.set(false);
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      timer1.cancel();
+      timer2.cancel();
+      super.tearDown();
+   }
+
+   protected void setupServers() throws Exception {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+   }
+
+   protected void setupClusters() {
+      setupClusterConnection("cluster0", 0, 1, "queues", MessageLoadBalancingType.ON_DEMAND,
1, isNetty(), false);
+      setupClusterConnection("cluster1", 1, 0, "queues", MessageLoadBalancingType.ON_DEMAND,
1, isNetty(), false);
+   }
+
+   protected boolean isNetty() {
+      return true;
+   }
+
+   @Override
+   protected ConfigurationImpl createBasicConfig(final int serverID) {
+      ConfigurationImpl config = super.createBasicConfig(serverID);
+      Map<String, AddressSettings> addrSettingsMap = config.getAddressesSettings();
+      AddressSettings addrSettings = new AddressSettings();
+      addrSettings.setMaxSizeBytes(10 * 1024);
+      addrSettings.setPageSizeBytes(5 * 1024);
+      addrSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      addrSettingsMap.put("queues", addrSettings);
+      if (serverID == 1) {
+         config.setMessageCounterEnabled(true);
+      }
+      return config;
+   }
+
+   @Test
+   public void testNonDurableMessageAddedWithPaging() throws Exception {
+      testMessageAddedWithPaging(false);
+   }
+
+   @Test
+   public void testDurableMessageAddedWithPaging() throws Exception {
+      testMessageAddedWithPaging(true);
+   }
+
+   //messages flow from one node to another, in paging mode
+   //check the messageAdded is correct.
+   private void testMessageAddedWithPaging(boolean durable) throws Exception {
+      startServers(0, 1);
+      numMsg = 100;
+
+      try {
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+
+         createQueue(0, "queues", "queue0", null, false);
+         createQueue(1, "queues", "queue0", null, false);
+
+         waitForBindings(1, "queues", 1, 0, true);
+         waitForBindings(0, "queues", 1, 0, false);
+
+         addConsumer(1, 1, "queue0", null);
+
+         System.out.println("sending.....");
+         send(0, "queues", numMsg, durable, null);
+
+         verifyReceiveAllOnSingleConsumer(true, numMsg, 1);
+
+         QueueControl control = (QueueControl) servers[1].getManagementService().getResource(ResourceNames.CORE_QUEUE
+ "queue0");
+
+         //wait up to 30sec to allow the counter get updated
+         long timeout = 30000;
+         while (timeout > 0 && (numMsg != control.getMessagesAdded())) {
+            Thread.sleep(1000);
+            timeout -= 1000;
+         }
+         assertEquals(numMsg, control.getMessagesAdded());
+      } finally {
+         stopServers(0, 1);
+      }
+   }
+
+   @Test
+   public void testMessageCounterWithPaging() throws Exception {
+      startServers(0, 1);
+
+      try {
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+
+         createQueue(0, "queues", "queue0", null, false);
+         createQueue(1, "queues", "queue0", null, false);
+
+         waitForBindings(1, "queues", 1, 0, true);
+         waitForBindings(0, "queues", 1, 0, false);
+
+         System.out.println("sending.....");
+         Thread sendThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+               try {
+                  send(0, "queues", numMsg, true, null);
+               } catch (Exception e) {
+                  e.printStackTrace();
+               }
+               System.out.println("messages sent.");
+            }
+         });
+
+         QueueControl control = (QueueControl) servers[1].getManagementService().getResource(ResourceNames.CORE_QUEUE
+ "queue0");
+         ActiveMQServerControl serverControl = (ActiveMQServerControl) servers[1].getManagementService().getResource(ResourceNames.CORE_SERVER);
+         serverControl.setMessageCounterSamplePeriod(300);
+
+         CountDownLatch resultLatch = new CountDownLatch(40);
+
+         MessageCounterCollector collector = new MessageCounterCollector(control, resultLatch);
+         timer1.schedule(collector, 0);
+
+         PeriodicalReceiver receiver = new PeriodicalReceiver(50, 1, 100);
+         timer2.schedule(receiver, 0);
+
+         sendThread.start();
+
+         try {
+            resultLatch.await(120, TimeUnit.SECONDS);
+         } finally {
+            stopFlag.set(true);
+         }
+         sendThread.join();
+         System.out.println("Results collected: " + results.size());
+         //checking
+         for (MessageCounterInfo info : results) {
+            assertTrue("countDelta should be positive " + info.getCountDelta() + dumpResults(results),
info.getCountDelta() >= 0);
+         }
+      } finally {
+         timer1.cancel();
+         timer2.cancel();
+         stopServers(0, 1);
+      }
+   }
+
+   private String dumpResults(List<MessageCounterInfo> results) {
+      StringBuilder builder = new StringBuilder("\n");
+      for (int i = 0; i < results.size(); i++) {
+         builder.append("result[" + i + "]: " + results.get(i).getCountDelta() + " " + results.get(i).getCount()
+ "\n");
+      }
+      return builder.toString();
+   }
+
+   //Periodically read the counter
+   private class MessageCounterCollector extends TimerTask {
+      private QueueControl queueControl;
+      private CountDownLatch resultLatch;
+
+      MessageCounterCollector(QueueControl queueControl, CountDownLatch resultLatch) {
+         this.queueControl = queueControl;
+         this.resultLatch = resultLatch;
+      }
+
+      @Override
+      public void run() {
+         if (stopFlag.get()) {
+            return;
+         }
+         try {
+            String result = queueControl.listMessageCounter();
+            MessageCounterInfo info = MessageCounterInfo.fromJSON(result);
+            if (info.getCountDelta() != 0) {
+               System.out.println("non zero value got ---> " + info.getCountDelta());
+            }
+            results.add(info);
+            resultLatch.countDown();
+            if (info.getCountDelta() < 0) {
+               //stop and make the test finish quick
+               stopFlag.set(true);
+               while (resultLatch.getCount() > 0) {
+                  resultLatch.countDown();
+               }
+            }
+         } catch (Exception e) {
+            e.printStackTrace();
+         } finally {
+            if (!stopFlag.get()) {
+               timer1.schedule(new MessageCounterCollector(this.queueControl, resultLatch),
200);
+            }
+         }
+      }
+   }
+
+   //Peroidically receive a number of messages
+   private class PeriodicalReceiver extends TimerTask {
+      private int batchSize;
+      private int serverID;
+      private long period;
+
+      PeriodicalReceiver(int batchSize, int serverID, long period) {
+         this.batchSize = batchSize;
+         this.serverID = serverID;
+         this.period = period;
+      }
+
+      @Override
+      public void run() {
+         if (stopFlag.get()) {
+            return;
+         }
+         int num = 0;
+         ClientSessionFactory sf = sfs[serverID];
+         ClientSession session = null;
+         ClientConsumer consumer = null;
+         try {
+            session = sf.createSession(false, true, false);
+            consumer = session.createConsumer("queue0", null);
+            session.start();
+            for (; num < batchSize || stopFlag.get(); num++) {
+               ClientMessage message = consumer.receive(2000);
+               if (message == null) {
+                  System.out.println("No more messages received!");
+                  break;
+               }
+               message.acknowledge();
+            }
+            session.commit();
+         } catch (ActiveMQException e) {
+            e.printStackTrace();
+         } finally {
+            System.out.println("received messages: " + num);
+            if (consumer != null) {
+               try {
+                  consumer.close();
+               } catch (ActiveMQException e) {
+                  e.printStackTrace();
+               }
+            }
+            if (session != null) {
+               try {
+                  session.close();
+               } catch (ActiveMQException e) {
+                  e.printStackTrace();
+               }
+            }
+
+            //we only receive (numMsg - 200) to avoid the paging being cleaned up
+            //when all paged messages are consumed.
+            if (!stopFlag.get() && total.addAndGet(num) < numMsg - 200) {
+               System.out.println("go for another batch " + total.get());
+               timer2.schedule(new PeriodicalReceiver(this.batchSize, this.serverID, this.period),
period);
+            }
+         }
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c16ba26/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index 86d19db..d040b8a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -414,13 +414,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
       } catch (Exception e) {
       }
 
-      try {
-         serverControl.setMessageCounterSamplePeriod(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD
- 1);
-         Assert.fail();
-      } catch (Exception e) {
-      }
+      //this only gets warning now and won't cause exception.
+      serverControl.setMessageCounterSamplePeriod(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD
- 1);
 
-      Assert.assertEquals(newSample, serverControl.getMessageCounterSamplePeriod());
+      Assert.assertEquals(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD - 1, serverControl.getMessageCounterSamplePeriod());
    }
 
    protected void restartServer() throws Exception {


Mime
View raw message