activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5837
Date Tue, 07 Jul 2015 21:24:20 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 2b320ac06 -> 1dcdf69f3


https://issues.apache.org/jira/browse/AMQ-5837

This commit adds an Inflight message size statistic to SubscriptionStatistics
so we can know the size of all the messages that are inflight, besides just the count.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/46055034
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/46055034
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/46055034

Branch: refs/heads/master
Commit: 46055034c949c7fbce717c93f5d94b4ca18dd23b
Parents: 2b320ac
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Mon Jun 1 18:12:22 2015 +0000
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Jul 7 16:33:58 2015 -0400

----------------------------------------------------------------------
 .../broker/region/AbstractSubscription.java     |   5 +
 .../broker/region/DurableTopicSubscription.java |   1 +
 .../broker/region/PrefetchSubscription.java     |   7 +
 .../activemq/broker/region/Subscription.java    |   5 +
 .../broker/region/SubscriptionStatistics.java   |  12 +
 .../broker/region/TopicSubscription.java        |  47 +++
 .../region/QueueDuplicatesFromStoreTest.java    |   5 +
 .../region/SubscriptionAddRemoveQueueTest.java  |   5 +
 .../AbstractInflightMessageSizeTest.java        | 295 +++++++++++++++++++
 ...ableSubscriptionInflightMessageSizeTest.java |  61 ++++
 ...ueueSubscriptionInflightMessageSizeTest.java |  61 ++++
 ...opicSubscriptionInflightMessageSizeTest.java |  60 ++++
 12 files changed, 564 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/46055034/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
index 92d1c0d..d22801c 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
@@ -228,6 +228,11 @@ public abstract class AbstractSubscription implements Subscription {
     }
 
     @Override
+    public long getInFlightMessageSize() {
+        return subscriptionStatistics.getInflightMessageSize().getTotalSize();
+    }
+
+    @Override
     public int getInFlightUsage() {
         if (info.getPrefetchSize() > 0) {
             return (getInFlightSize() * 100)/info.getPrefetchSize();

http://git-wip-us.apache.org/repos/asf/activemq/blob/46055034/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index 12c418a..0107c58 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -237,6 +237,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements
Us
                     savedDispateched = new ArrayList<MessageReference>(dispatched);
                 }
                 dispatched.clear();
+                getSubscriptionStatistics().getInflightMessageSize().reset();
             }
             if (!keepDurableSubsActive && pending.isTransient()) {
                 try {

http://git-wip-us.apache.org/repos/asf/activemq/blob/46055034/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 2a63c33..d148e80 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -176,6 +176,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                             pending.remove();
                             createMessageDispatch(node, node.getMessage());
                             dispatched.add(node);
+                            getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
                             onDispatch(node, node.getMessage());
                         }
                         return;
@@ -240,6 +241,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                 }
                 for (final MessageReference node : removeList) {
                     dispatched.remove(node);
+                    getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
                 }
                 // this only happens after a reconnect - get an ack which is not
                 // valid
@@ -257,6 +259,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                             getSubscriptionStatistics().getDequeues().increment();
                             ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
                             dispatched.remove(node);
+                            getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
                         } else {
                             registerRemoveSync(context, node);
                         }
@@ -379,6 +382,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                 }
                 for (final MessageReference node : removeList) {
                     dispatched.remove(node);
+                    getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
                 }
                 if (!callDispatchMatched) {
                     throw new JMSException(
@@ -427,6 +431,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                         synchronized(dispatchLock) {
                             getSubscriptionStatistics().getDequeues().increment();
                             dispatched.remove(node);
+                            getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
                             nodeDest.getDestinationStatistics().getInflight().decrement();
                         }
                         nodeDest.wakeup();
@@ -620,6 +625,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
         for (MessageReference r : dispatched) {
             if (r.getRegionDestination() == destination) {
                 references.add(r);
+                getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize());
             }
         }
         rc.addAll(references);
@@ -697,6 +703,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
         if (node != QueueMessageReference.NULL_MESSAGE) {
             getSubscriptionStatistics().getDispatched().increment();
             dispatched.add(node);
+            getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
         }
         if (getPrefetchSize() == 0) {
             while (true) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/46055034/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
index ec37512..2c8afed 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
@@ -197,6 +197,11 @@ public interface Subscription extends SubscriptionRecovery {
     int getInFlightSize();
 
     /**
+     * @return the size in bytes of the messages awaiting acknowledgement
+     */
+    long getInFlightMessageSize();
+
+    /**
      * @return the in flight messages as a percentage of the prefetch size
      */
     int getInFlightUsage();

http://git-wip-us.apache.org/repos/asf/activemq/blob/46055034/activemq-broker/src/main/java/org/apache/activemq/broker/region/SubscriptionStatistics.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/SubscriptionStatistics.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/SubscriptionStatistics.java
index 09fab8a..d6a276e 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/SubscriptionStatistics.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/SubscriptionStatistics.java
@@ -18,6 +18,7 @@
 package org.apache.activemq.broker.region;
 
 import org.apache.activemq.management.CountStatisticImpl;
+import org.apache.activemq.management.SizeStatisticImpl;
 import org.apache.activemq.management.StatsImpl;
 
 /**
@@ -29,6 +30,7 @@ public class SubscriptionStatistics extends StatsImpl {
     protected CountStatisticImpl enqueues;
     protected CountStatisticImpl dequeues;
     protected CountStatisticImpl dispatched;
+    protected SizeStatisticImpl inflightMessageSize;
 
 
     public SubscriptionStatistics() {
@@ -41,11 +43,13 @@ public class SubscriptionStatistics extends StatsImpl {
         enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been
sent to the subscription");
         dispatched = new CountStatisticImpl("dispatched", "The number of messages that have
been dispatched from the subscription");
         dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been
acknowledged from the subscription");
+        inflightMessageSize = new SizeStatisticImpl("inflightMessageSize", "The size in bytes
of messages dispatched but awaiting acknowledgement");
 
         addStatistic("consumedCount", consumedCount);
         addStatistic("enqueues", enqueues);
         addStatistic("dispatched", dispatched);
         addStatistic("dequeues", dequeues);
+        addStatistic("inflightMessageSize", inflightMessageSize);
 
         this.setEnabled(enabled);
     }
@@ -66,6 +70,10 @@ public class SubscriptionStatistics extends StatsImpl {
         return dispatched;
     }
 
+    public SizeStatisticImpl getInflightMessageSize() {
+        return inflightMessageSize;
+    }
+
     public void reset() {
         if (this.isDoReset()) {
             super.reset();
@@ -73,6 +81,7 @@ public class SubscriptionStatistics extends StatsImpl {
             enqueues.reset();
             dequeues.reset();
             dispatched.reset();
+            inflightMessageSize.reset();
         }
     }
 
@@ -82,6 +91,7 @@ public class SubscriptionStatistics extends StatsImpl {
         enqueues.setEnabled(enabled);
         dispatched.setEnabled(enabled);
         dequeues.setEnabled(enabled);
+        inflightMessageSize.setEnabled(enabled);
     }
 
     public void setParent(SubscriptionStatistics parent) {
@@ -90,11 +100,13 @@ public class SubscriptionStatistics extends StatsImpl {
             enqueues.setParent(parent.enqueues);
             dispatched.setParent(parent.dispatched);
             dequeues.setParent(parent.dequeues);
+            inflightMessageSize.setParent(parent.inflightMessageSize);
         } else {
             consumedCount.setParent(null);
             enqueues.setParent(null);
             dispatched.setParent(null);
             dequeues.setParent(null);
+            inflightMessageSize.setParent(null);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/46055034/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index c59c359..17a3137 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -17,7 +17,11 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -37,6 +41,7 @@ import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.thread.Scheduler;
@@ -71,6 +76,20 @@ public class TopicSubscription extends AbstractSubscription {
     protected boolean active = false;
     protected boolean discarding = false;
 
+
+    /**
+     * This Map is used to keep track of messages that have been dispatched in sorted order
to
+     * optimize message acknowledgement
+     */
+    private NavigableMap<MessageId, MessageReference> dispatched = new ConcurrentSkipListMap<>(
+            new Comparator<MessageId>() {
+                @Override
+                public int compare(MessageId m1, MessageId m2) {
+                    return m1 == null ? (m2 == null ? 0 : -1) : (m2 == null ? 1
+                            : Long.compare(m1.getBrokerSequenceId(), m2.getBrokerSequenceId()));
+                }
+            });
+
     public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info,
SystemUsage usageManager) throws Exception {
         super(broker, context, info);
         this.usageManager = usageManager;
@@ -250,6 +269,8 @@ public class TopicSubscription extends AbstractSubscription {
                     if (node.getMessageId().equals(mdn.getMessageId())) {
                         matched.remove();
                         getSubscriptionStatistics().getDispatched().increment();
+                        dispatched.put(node.getMessageId(), node);
+                        getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
                         node.decrementReferenceCount();
                         break;
                     }
@@ -277,6 +298,7 @@ public class TopicSubscription extends AbstractSubscription {
                             }
                         }
                         getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
+                        updateInflightMessageSizeOnAck(ack);
                         dispatchMatched();
                     }
                 });
@@ -289,6 +311,7 @@ public class TopicSubscription extends AbstractSubscription {
                     }
                 }
                 getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
+                updateInflightMessageSizeOnAck(ack);
             }
             while (true) {
                 int currentExtension = prefetchExtension.get();
@@ -379,6 +402,27 @@ public class TopicSubscription extends AbstractSubscription {
         }
     }
 
+    /**
+     * Update the inflight statistics on message ack.  Since a message ack could be a range,
+     * we need to grab a subtree of the dispatched map to acknowledge messages.  Finding
the
+     * subMap is an O(log n) operation.
+     * @param ack
+     */
+    private void updateInflightMessageSizeOnAck(final MessageAck ack) {
+        if (ack.getFirstMessageId() != null) {
+            NavigableMap<MessageId, MessageReference> acked = dispatched
+                    .subMap(ack.getFirstMessageId(), true, ack.getLastMessageId(), true);
+            Iterator<MessageId> i = acked.keySet().iterator();
+            while (i.hasNext()) {
+                getSubscriptionStatistics().getInflightMessageSize().addSize(-acked.get(i.next()).getSize());
+                i.remove();
+            }
+        } else {
+            getSubscriptionStatistics().getInflightMessageSize().addSize(-dispatched.get(ack.getLastMessageId()).getSize());
+            dispatched.remove(ack.getLastMessageId());
+        }
+    }
+
     @Override
     public int countBeforeFull() {
         return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize()
+ prefetchExtension.get() - getDispatchedQueueSize();
@@ -602,6 +646,8 @@ public class TopicSubscription extends AbstractSubscription {
         if (node != null) {
             md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
             getSubscriptionStatistics().getDispatched().increment();
+            dispatched.put(node.getMessageId(), node);
+            getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
             // Keep track if this subscription is receiving messages from a single destination.
             if (singleDestination) {
                 if (destination == null) {
@@ -683,6 +729,7 @@ public class TopicSubscription extends AbstractSubscription {
             }
         }
         setSlowConsumer(false);
+        dispatched.clear();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/46055034/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
index d692d03..99382d0 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
@@ -366,6 +366,11 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
             public SubscriptionStatistics getSubscriptionStatistics() {
                 return subscriptionStatistics;
             }
+
+            @Override
+            public long getInFlightMessageSize() {
+                return subscriptionStatistics.getInflightMessageSize().getTotalSize();
+            }
         };
 
         queue.addSubscription(contextNotInTx, subscription);

http://git-wip-us.apache.org/repos/asf/activemq/blob/46055034/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
index 50c2136..2541a64 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
@@ -376,5 +376,10 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
             return subscriptionStatistics;
         }
 
+        @Override
+        public long getInFlightMessageSize() {
+            return subscriptionStatistics.getInflightMessageSize().getTotalSize();
+        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/46055034/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
new file mode 100644
index 0000000..07784e7
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.statistics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * This test shows Inflight Message sizes are correct for various acknowledgement modes.
+ */
+public abstract class AbstractInflightMessageSizeTest {
+
+    protected BrokerService brokerService;
+    protected Connection connection;
+    protected String brokerUrlString;
+    protected Session session;
+    protected javax.jms.Destination dest;
+    protected Destination amqDestination;
+    protected MessageConsumer consumer;
+    protected int prefetch = 100;
+    final protected int ackType;
+    final protected boolean optimizeAcknowledge;
+    final protected String destName = "testDest";
+
+    @Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {ActiveMQSession.SESSION_TRANSACTED, true},
+                {ActiveMQSession.AUTO_ACKNOWLEDGE, true},
+                {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, true},
+                {ActiveMQSession.CLIENT_ACKNOWLEDGE, true},
+                {ActiveMQSession.SESSION_TRANSACTED, false},
+                {ActiveMQSession.AUTO_ACKNOWLEDGE, false},
+                {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, false},
+                {ActiveMQSession.CLIENT_ACKNOWLEDGE, false}
+        });
+    }
+
+    public AbstractInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
+        this.ackType = ackType;
+        this.optimizeAcknowledge = optimizeAcknowledge;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        TransportConnector tcp = brokerService
+                .addConnector("tcp://localhost:0");
+        brokerService.start();
+        //used to test optimizeAcknowledge works
+        String optAckString = optimizeAcknowledge ? "?jms.optimizeAcknowledge=true&jms.optimizedAckScheduledAckInterval=2000"
: "";
+        brokerUrlString = tcp.getPublishableConnectString() + optAckString;
+        connection = createConnectionFactory().createConnection();
+        connection.setClientID("client1");
+        connection.start();
+        session = connection.createSession(ackType == ActiveMQSession.SESSION_TRANSACTED,
ackType);
+        dest = getDestination();
+        consumer = getMessageConsumer();
+        amqDestination = TestSupport.getDestination(brokerService, getActiveMQDestination());
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory()
+            throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrlString);
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setTopicPrefetch(prefetch);
+        prefetchPolicy.setQueuePrefetch(prefetch);
+        prefetchPolicy.setOptimizeDurableTopicPrefetch(prefetch);
+        factory.setPrefetchPolicy(prefetchPolicy);
+        return factory;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        brokerService.stop();
+    }
+
+    /**
+     * Tests that inflight message size goes up and comes back down to 0 after
+     * messages are consumed
+     *
+     * @throws javax.jms.JMSException
+     * @throws InterruptedException
+     */
+    @Test(timeout=15000)
+    public void testInflightMessageSize() throws Exception {
+        final long size = sendMessages(10);
+
+        assertTrue("Inflight message size should be greater than the content length sent",
Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getSubscription().getInFlightMessageSize() > size;
+            }
+        }));
+
+        receiveMessages(10);
+
+        assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition()
{
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getSubscription().getInFlightMessageSize() == 0;
+            }
+        }));
+    }
+
+    /**
+     * Test that the in flight message size won't rise after prefetch is filled
+     *
+     * @throws Exception
+     */
+    @Test(timeout=15000)
+    public void testInflightMessageSizePrefetchFilled() throws Exception {
+        final long size = sendMessages(prefetch);
+
+        assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getSubscription().getInFlightMessageSize() > size;
+            }
+        }));
+
+        final long inFlightSize = getSubscription().getInFlightMessageSize();
+        sendMessages(10);
+
+        //Prefetch has been filled, so the size should not change with 10 more messages
+        assertEquals("Inflight message size should not change", inFlightSize, getSubscription().getInFlightMessageSize());
+
+        receiveMessages(prefetch + 10);
+
+        assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition()
{
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getSubscription().getInFlightMessageSize() == 0;
+            }
+        }));
+    }
+
+    /**
+     * Test that the in flight message size will still rise if prefetch is not filled
+     *
+     * @throws Exception
+     */
+    @Test(timeout=15000)
+    public void testInflightMessageSizePrefetchNotFilled() throws Exception {
+        final long size = sendMessages(prefetch - 10);
+
+        assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getSubscription().getInFlightMessageSize() > size;
+            }
+        }));
+
+        //capture the inflight size and send 10 more messages
+        final long inFlightSize = getSubscription().getInFlightMessageSize();
+        sendMessages(10);
+
+        //Prefetch has NOT been filled, so the size should rise with 10 more messages
+        assertTrue("Inflight message size should be greater than previous inlight size",
Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getSubscription().getInFlightMessageSize() > inFlightSize;
+            }
+        }));
+
+        receiveMessages(prefetch);
+
+        assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition()
{
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getSubscription().getInFlightMessageSize() == 0;
+            }
+        }));
+    }
+
+
+    /**
+     * Tests that inflight message size goes up and doesn't go down if receive is rolledback
+     *
+     * @throws javax.jms.JMSException
+     * @throws InterruptedException
+     */
+    @Test(timeout=15000)
+    public void testInflightMessageSizeRollback() throws Exception {
+        Assume.assumeTrue(ackType == ActiveMQSession.SESSION_TRANSACTED);
+
+        final long size = sendMessages(10);
+
+        assertTrue("Inflight message size should be greater than the content length sent",
Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getSubscription().getInFlightMessageSize() > size;
+            }
+        }));
+
+       long inFlightSize = getSubscription().getInFlightMessageSize();
+
+        for (int i = 0; i < 10; i++) {
+            consumer.receive();
+        }
+        session.rollback();
+
+        assertEquals("Inflight message size should not change on rollback", inFlightSize,
getSubscription().getInFlightMessageSize());
+    }
+
+    /**
+     * This method will generate random sized messages up to 150000 bytes.
+     *
+     * @param count
+     * @throws JMSException
+     */
+    protected long sendMessages(int count) throws JMSException {
+        MessageProducer producer = session.createProducer(dest);
+        long totalSize = 0;
+        for (int i = 0; i < count; i++) {
+            Random r = new Random();
+            int size = r.nextInt(150000);
+            totalSize += size;
+            byte[] bytes = new byte[size > 0 ? size : 1];
+            r.nextBytes(bytes);
+            BytesMessage bytesMessage = session.createBytesMessage();
+            bytesMessage.writeBytes(bytes);
+            producer.send(bytesMessage);
+        }
+        if (session.getTransacted()) {
+            session.commit();
+        }
+        return totalSize;
+    }
+
+    protected void receiveMessages(int count) throws JMSException {
+        for (int i = 0; i < count; i++) {
+            javax.jms.Message m = consumer.receive();
+            if (ackType == ActiveMQSession.SESSION_TRANSACTED) {
+                session.commit();
+            } else if (ackType != ActiveMQSession.AUTO_ACKNOWLEDGE) {
+                m.acknowledge();
+            }
+        }
+    }
+
+    protected abstract Subscription getSubscription();
+
+    protected abstract ActiveMQDestination getActiveMQDestination();
+
+    protected abstract MessageConsumer getMessageConsumer() throws JMSException;
+
+    protected abstract javax.jms.Destination getDestination() throws JMSException ;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/46055034/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java
new file mode 100644
index 0000000..29d6cb7
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.statistics;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.SubscriptionKey;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * This test shows Inflight Message sizes are correct for various acknowledgement modes
+ * using a DurableSubscription
+ */
+@RunWith(Parameterized.class)
+public class DurableSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest
{
+
+    public DurableSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge)
{
+        super(ackType, optimizeAcknowledge);
+    }
+
+    @Override
+    protected MessageConsumer getMessageConsumer() throws JMSException {
+        return session.createDurableSubscriber((javax.jms.Topic)dest, "sub1");
+    }
+
+    @Override
+    protected Subscription getSubscription() {
+        return ((Topic)amqDestination).getDurableTopicSubs().get(new SubscriptionKey("client1",
"sub1"));
+    }
+
+    @Override
+    protected javax.jms.Topic getDestination() throws JMSException {
+        return session.createTopic(destName);
+    }
+
+    @Override
+    protected ActiveMQDestination getActiveMQDestination() {
+        return new ActiveMQTopic(destName);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/46055034/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java
new file mode 100644
index 0000000..84ddc71
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.statistics;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * This test shows Inflight Message sizes are correct for various acknowledgement modes
+ * using a QueueSubscription
+ */
+@RunWith(Parameterized.class)
+public class QueueSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest
{
+
+    public QueueSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge)
{
+        super(ackType, optimizeAcknowledge);
+    }
+
+    @Override
+    protected MessageConsumer getMessageConsumer() throws JMSException {
+        return session.createConsumer(dest);
+    }
+
+    @Override
+    protected Subscription getSubscription() {
+        return ((Queue)amqDestination).getConsumers().get(0);
+    }
+
+    @Override
+    protected Destination getDestination() throws JMSException {
+        return session.createQueue(destName);
+    }
+
+    @Override
+    protected ActiveMQDestination getActiveMQDestination() {
+        return new ActiveMQQueue(destName);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/46055034/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java
new file mode 100644
index 0000000..797d409
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.statistics;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * This test shows Inflight Message sizes are correct for various acknowledgement modes
+ * using a TopicSubscription
+ */
+@RunWith(Parameterized.class)
+public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest
{
+
+    public TopicSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge)
{
+        super(ackType, optimizeAcknowledge);
+    }
+
+    @Override
+    protected MessageConsumer getMessageConsumer() throws JMSException {
+        return session.createConsumer(dest);
+    }
+
+    @Override
+    protected Subscription getSubscription() {
+        return amqDestination.getConsumers().get(0);
+    }
+
+    @Override
+    protected Destination getDestination() throws JMSException {
+        return session.createTopic(destName);
+    }
+
+    @Override
+    protected ActiveMQDestination getActiveMQDestination() {
+        return new ActiveMQTopic(destName);
+    }
+
+}


Mime
View raw message