activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: [AMQ-6547] revert mod from AMQ-3143 such that waitForSpace respects child usage, fix test and validate mKahadb blocking send
Date Tue, 14 Feb 2017 11:34:22 GMT
Repository: activemq
Updated Branches:
  refs/heads/master f5baebb00 -> fad50812a


[AMQ-6547] revert mod from AMQ-3143 such that waitForSpace respects child usage, fix test
and validate mKahadb blocking send


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fad50812
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fad50812
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fad50812

Branch: refs/heads/master
Commit: fad50812af792606efca5653f9b4761e663ac087
Parents: f5baebb
Author: gtully <gary.tully@gmail.com>
Authored: Tue Feb 14 11:33:50 2017 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Tue Feb 14 11:33:59 2017 +0000

----------------------------------------------------------------------
 .../org/apache/activemq/usage/StoreUsage.java   | 10 ---
 .../store/kahadb/MKahaDBStoreLimitTest.java     | 76 +++++++++++++++++++-
 .../apache/activemq/usage/StoreUsageTest.java   | 35 ++++++---
 3 files changed, 102 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/fad50812/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java b/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java
index 928da83..a639cd6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java
@@ -78,16 +78,6 @@ public class StoreUsage extends PercentLimitUsage<StoreUsage> {
         }
     }
 
-    @Override
-    public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException
{
-        if (parent != null) {
-            if (parent.waitForSpace(timeout, highWaterMark)) {
-                return true;
-            }
-        }
-
-        return super.waitForSpace(timeout, highWaterMark);
-    }
 
     @Override
     protected void updateLimitBasedOnPercent() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/fad50812/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java
index 0ce5795..4a8eea9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java
@@ -21,6 +21,8 @@ import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.BaseDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.usage.StoreUsage;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.Wait;
 import org.junit.After;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -32,8 +34,14 @@ import javax.jms.Destination;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.*;
 
@@ -160,6 +168,72 @@ public class MKahaDBStoreLimitTest {
 
     }
 
+    @Test
+    public void testExplicitAdapterBlockingProducer() throws Exception {
+        MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
+        KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+        kahaStore.setJournalMaxFileLength(1024*8);
+        kahaStore.setIndexDirectory(new File(IOHelper.getDefaultDataDirectory()));
+
+        FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+        StoreUsage storeUsage = new StoreUsage();
+        storeUsage.setLimit(40*1024);
+
+        filtered.setUsage(storeUsage);
+        filtered.setDestination(queueA);
+        filtered.setPersistenceAdapter(kahaStore);
+        List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
+        stores.add(filtered);
+
+        persistenceAdapter.setFilteredPersistenceAdapters(stores);
+
+        BrokerService brokerService = createBroker(persistenceAdapter);
+        brokerService.start();
+
+        final AtomicBoolean done = new AtomicBoolean();
+        ExecutorService executor = Executors.newCachedThreadPool();
+        executor.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    produceMessages(queueA, 20);
+                    done.set(true);
+                } catch (Exception ignored) {
+                }
+            }
+        });
+
+        assertTrue("some messages got to dest", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                BaseDestination baseDestinationA = (BaseDestination) broker.getRegionBroker().getDestinationMap().get(queueA);
+                return baseDestinationA != null && baseDestinationA.getDestinationStatistics().getMessages().getCount()
> 4l;
+            }
+        }));
+
+        BaseDestination baseDestinationA = (BaseDestination) broker.getRegionBroker().getDestinationMap().get(queueA);
+        // loop till producer stalled
+        long enqueues = 0l;
+        do {
+            enqueues = baseDestinationA.getDestinationStatistics().getEnqueues().getCount();
+            LOG.info("Dest Enqueues: " + enqueues);
+            TimeUnit.MILLISECONDS.sleep(500);
+        } while (enqueues != baseDestinationA.getDestinationStatistics().getEnqueues().getCount());
+
+
+        assertFalse("expect producer to block", done.get());
+
+        LOG.info("Store global u: " + broker.getSystemUsage().getStoreUsage().getUsage()
+ ", %:" + broker.getSystemUsage().getStoreUsage().getPercentUsage());
+
+        assertTrue("some usage", broker.getSystemUsage().getStoreUsage().getUsage() >
0);
+
+        LOG.info("Store A u: " + baseDestinationA.getSystemUsage().getStoreUsage().getUsage()
+ ", %: " + baseDestinationA.getSystemUsage().getStoreUsage().getPercentUsage());
+
+        assertTrue("limited store has more % usage than parent", baseDestinationA.getSystemUsage().getStoreUsage().getPercentUsage()
> broker.getSystemUsage().getStoreUsage().getPercentUsage());
+
+        executor.shutdownNow();
+    }
+
 
     private void consume(Destination queue) throws Exception {
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false");
@@ -180,7 +254,7 @@ public class MKahaDBStoreLimitTest {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer producer = session.createProducer(queue);
         BytesMessage bytesMessage = session.createBytesMessage();
-        bytesMessage.writeBytes(new byte[2*1024]);
+        bytesMessage.writeBytes(new byte[1*1024]);
         for (int i = 0; i < count; ++i) {
             producer.send(bytesMessage);
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fad50812/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageTest.java
index 901a325..4723449 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageTest.java
@@ -20,21 +20,24 @@ package org.apache.activemq.usage;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.BaseDestination;
 import org.apache.activemq.util.ProducerThread;
 import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.Session;
+import java.util.concurrent.TimeUnit;
 
 public class StoreUsageTest extends EmbeddedBrokerTestSupport {
-
-    final int WAIT_TIME_MILLS = 20*1000;
+    private static final Logger LOG = LoggerFactory.getLogger(StoreUsageTest.class);
 
     @Override
     protected BrokerService createBroker() throws Exception {
         BrokerService broker = super.createBroker();
-        broker.getSystemUsage().getStoreUsage().setLimit(10 * 1024);
+        broker.getSystemUsage().getStoreUsage().setLimit(34 * 1024);
         broker.deleteAllMessages();
         return broker;
     }
@@ -52,20 +55,36 @@ public class StoreUsageTest extends EmbeddedBrokerTestSupport {
         final ProducerThread producer = new ProducerThread(sess, dest);
         producer.start();
 
+        assertTrue("some messages sent", Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                BaseDestination baseDestination = (BaseDestination) broker.getRegionBroker().getDestinationMap().get(dest);
+
+                return baseDestination != null && baseDestination.getDestinationStatistics().getEnqueues().getCount()
> 0;
+            }
+        }));
+
+        BaseDestination baseDestination = (BaseDestination) broker.getRegionBroker().getDestinationMap().get(dest);
+        LOG.info("Sent u: " + baseDestination.getDestinationStatistics().getEnqueues());
+
         // wait for the producer to block
-        Thread.sleep(WAIT_TIME_MILLS / 2);
+        int sent = 0;
+        do {
+            sent = producer.getSentCount();
+            TimeUnit.SECONDS.sleep(1);
+            LOG.info("Sent: " + sent);
+        } while (sent !=  producer.getSentCount());
 
+        LOG.info("Increasing limit! enqueues: " + baseDestination.getDestinationStatistics().getEnqueues().getCount());
         broker.getAdminView().setStoreLimit(1024 * 1024);
 
-        Thread.sleep(WAIT_TIME_MILLS);
-
         Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
                 return producer.getSentCount() == producer.getMessageCount();
             }
-        }, WAIT_TIME_MILLS * 2);
+        });
 
-        assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount());
+        assertEquals("Producer sent all messages", producer.getMessageCount(), producer.getSentCount());
+        assertEquals("Enqueues match sent", producer.getSentCount(), baseDestination.getDestinationStatistics().getEnqueues().getCount());
 
     }
 }


Mime
View raw message