activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1059 option to monitor Paging counters
Date Thu, 23 Mar 2017 14:36:02 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 633b9c75d -> f798178c6


ARTEMIS-1059 option to monitor Paging counters

Adding System.property artemis.debug.paging.interval (in seconds)
to debug paging counters.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1c88c06a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1c88c06a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1c88c06a

Branch: refs/heads/master
Commit: 1c88c06abb1d1ac93148bd59c2c9b307df574e83
Parents: 633b9c7
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Wed Mar 22 12:17:20 2017 -0400
Committer: Justin Bertram <jbertram@apache.org>
Committed: Thu Mar 23 09:35:40 2017 -0500

----------------------------------------------------------------------
 .../artemis/core/paging/PagingManager.java      |  4 ++
 .../artemis/core/paging/PagingStoreFactory.java | 11 ++++
 .../core/paging/impl/PagingManagerImpl.java     | 41 ++++++++++++++
 .../paging/impl/PagingStoreFactoryDatabase.java | 10 ++++
 .../core/paging/impl/PagingStoreFactoryNIO.java | 10 ++++
 .../tests/integration/client/ConsumerTest.java  | 56 ++++++++++++++++++++
 6 files changed, 132 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c88c06a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
index 35d2235..4d472e1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
@@ -107,4 +107,8 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
 
    boolean isDiskFull();
 
+   default long getGlobalSize() {
+      return 0;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c88c06a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
index a90fd44..75799d2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging;
 
 import java.util.List;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@@ -49,4 +50,14 @@ public interface PagingStoreFactory {
 
    void injectMonitor(FileStoreMonitor monitor) throws Exception;
 
+   default ScheduledExecutorService getScheduledExecutor() {
+      return null;
+   }
+
+   default Executor newExecutor() {
+      return null;
+   }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c88c06a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 8c2e1f2..e036c16 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -31,6 +32,7 @@ import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -40,6 +42,8 @@ import org.jboss.logging.Logger;
 
 public final class PagingManagerImpl implements PagingManager {
 
+   private static final int ARTEMIS_DEBUG_PAGING_INTERVAL = Integer.valueOf(System.getProperty("artemis.debug.paging.interval",
"0"));
+
    private static final Logger logger = Logger.getLogger(PagingManagerImpl.class);
 
    private volatile boolean started = false;
@@ -62,6 +66,8 @@ public final class PagingManagerImpl implements PagingManager {
 
    private final AtomicLong globalSizeBytes = new AtomicLong(0);
 
+   private final AtomicLong numberOfMessages = new AtomicLong(0);
+
    private final long maxSize;
 
    private volatile boolean cleanupEnabled = true;
@@ -70,6 +76,8 @@ public final class PagingManagerImpl implements PagingManager {
 
    private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions
= new ConcurrentHashMap<>();
 
+   private ActiveMQScheduledComponent scheduledComponent = null;
+
    // Static
    // --------------------------------------------------------------------------------------------------------------------------
 
@@ -109,6 +117,13 @@ public final class PagingManagerImpl implements PagingManager {
 
    @Override
    public PagingManagerImpl addSize(int size) {
+
+      if (size > 0) {
+         numberOfMessages.incrementAndGet();
+      } else {
+         numberOfMessages.decrementAndGet();
+      }
+
       long newSize = globalSizeBytes.addAndGet(size);
 
       if (newSize < 0) {
@@ -121,6 +136,11 @@ public final class PagingManagerImpl implements PagingManager {
       return this;
    }
 
+   @Override
+   public long getGlobalSize() {
+      return globalSizeBytes.get();
+   }
+
    protected void checkMemoryRelease() {
       if (!diskFull && (maxSize < 0 || globalSizeBytes.get() < maxSize) &&
!blockedStored.isEmpty()) {
          Iterator<PagingStore> storeIterator = blockedStored.iterator();
@@ -314,12 +334,28 @@ public final class PagingManagerImpl implements PagingManager {
 
          reloadStores();
 
+         if (ARTEMIS_DEBUG_PAGING_INTERVAL > 0) {
+            this.scheduledComponent = new ActiveMQScheduledComponent(pagingStoreFactory.getScheduledExecutor(),
pagingStoreFactory.newExecutor(), ARTEMIS_DEBUG_PAGING_INTERVAL, TimeUnit.SECONDS, false)
{
+               @Override
+               public void run() {
+                  debug();
+               }
+            };
+
+            this.scheduledComponent.start();
+
+         }
+
          started = true;
       } finally {
          unlock();
       }
    }
 
+   public void debug() {
+      logger.info("size = " + globalSizeBytes + " bytes, messages = " + numberOfMessages);
+   }
+
    @Override
    public synchronized void stop() throws Exception {
       if (!started) {
@@ -327,6 +363,11 @@ public final class PagingManagerImpl implements PagingManager {
       }
       started = false;
 
+      if (scheduledComponent != null) {
+         this.scheduledComponent.stop();
+         this.scheduledComponent = null;
+      }
+
       lock();
       try {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c88c06a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
index 7917165..b274848 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
@@ -79,6 +79,16 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
 
    private JDBCSequentialFile directoryList;
 
+   @Override
+   public ScheduledExecutorService getScheduledExecutor() {
+      return scheduledExecutor;
+   }
+
+   @Override
+   public Executor newExecutor() {
+      return executorFactory.getExecutor();
+   }
+
    private boolean started = false;
 
    public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c88c06a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
index 823baf8..c65b913 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
@@ -93,6 +93,16 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
    // Public --------------------------------------------------------
 
    @Override
+   public ScheduledExecutorService getScheduledExecutor() {
+      return scheduledExecutor;
+   }
+
+   @Override
+   public Executor newExecutor() {
+      return executorFactory.getExecutor();
+   }
+
+   @Override
    public void stop() {
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c88c06a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index 037385f..4a0ef04 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -308,8 +308,51 @@ public class ConsumerTest extends ActiveMQTestBase {
    }
 
 
+   public void internalSimpleSend(int protocolSender, int protocolConsumer) throws Throwable
{
+
+      ConnectionFactory factorySend = createFactory(protocolSender);
+      ConnectionFactory factoryConsume = protocolConsumer == protocolSender ? factorySend
: createFactory(protocolConsumer);
+
+
+      Connection connection = factorySend.createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = session.createQueue(QUEUE.toString());
+         MessageProducer producer = session.createProducer(queue);
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         TextMessage msg = session.createTextMessage("hello");
+         msg.setIntProperty("mycount", 0);
+         producer.send(msg);
+         connection.close();
+
+         connection = factoryConsume.createConnection();
+         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         queue = session.createQueue(QUEUE.toString());
+
+         connection.start();
+
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         TextMessage message = (TextMessage) consumer.receive(1000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals(0, message.getIntProperty("mycount"));
+         Assert.assertEquals("hello", message.getText());
+
+         Wait.waitFor(() -> server.getPagingManager().getGlobalSize() == 0, 5000, 100);
+         Assert.assertEquals(0, server.getPagingManager().getGlobalSize());
+
+      } finally {
+         connection.close();
+      }
+   }
+
+
    public void internalSend(int protocolSender, int protocolConsumer) throws Throwable {
 
+      internalSimpleSend(protocolSender, protocolConsumer);
+
       ConnectionFactory factorySend = createFactory(protocolSender);
       ConnectionFactory factoryConsume = protocolConsumer == protocolSender ? factorySend
: createFactory(protocolConsumer);
 
@@ -414,6 +457,19 @@ public class ConsumerTest extends ActiveMQTestBase {
          TextMessage msg = (TextMessage) consumer.receive(1000);
          Assert.assertEquals("testSelectorExampleFromSpecs:2", msg.getText());
 
+         consumer.close();
+
+         consumer = session.createConsumer(queue);
+         msg = (TextMessage)consumer.receive(5000);
+         Assert.assertNotNull(msg);
+
+         Assert.assertNull(consumer.receiveNoWait());
+
+         Wait.waitFor(() -> server.getPagingManager().getGlobalSize() == 0, 5000, 100);
+
+
+         Assert.assertEquals(0, server.getPagingManager().getGlobalSize());
+
       } finally {
          connection.close();
       }


Mime
View raw message