activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: [AMQ-3233] respect policy entry blockedProducerWarningInterval for flow control warning, 0 disables and Xmillis makes it periodic, default period of 30s is not unlike the existing once behaviour. fix and tests
Date Fri, 03 Mar 2017 11:23:43 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 2809befff -> 01b1f7f69


[AMQ-3233] respect policy entry blockedProducerWarningInterval for flow control warning, 0
disables and Xmillis makes it periodic, default period of 30s is not unlike the existing once
behaviour. fix and tests


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/01b1f7f6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/01b1f7f6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/01b1f7f6

Branch: refs/heads/master
Commit: 01b1f7f6945ba0897732b9eaae2ee1c9d50faf07
Parents: 2809bef
Author: gtully <gary.tully@gmail.com>
Authored: Fri Mar 3 11:23:23 2017 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Fri Mar 3 11:23:23 2017 +0000

----------------------------------------------------------------------
 .../activemq/broker/region/BaseDestination.java | 21 +++--
 .../apache/activemq/broker/region/Queue.java    |  7 +-
 .../apache/activemq/broker/region/Topic.java    |  3 +-
 .../activemq/ProducerFlowControlTest.java       | 50 +++++++++-
 .../usecases/TopicProducerFlowControlTest.java  | 97 +++++++++++++-------
 5 files changed, 131 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/01b1f7f6/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index fa5ae49..aa2f7b5 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -68,7 +68,7 @@ public abstract class BaseDestination implements Destination {
     protected MemoryUsage memoryUsage;
     private boolean producerFlowControl = true;
     private boolean alwaysRetroactive = false;
-    protected boolean warnOnProducerFlowControl = true;
+    protected long lastBlockedProducerWarnTime = 0l;
     protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
 
     private int maxProducersToAudit = 1024;
@@ -683,7 +683,6 @@ public abstract class BaseDestination implements Destination {
             }
         } else {
             long start = System.currentTimeMillis();
-            long nextWarn = start;
             producerBrokerExchange.blockingOnFlowControl(true);
             destinationStatistics.getBlockedSends().increment();
             while (!usage.waitForSpace(1000, highWaterMark)) {
@@ -691,10 +690,8 @@ public abstract class BaseDestination implements Destination {
                     throw new IOException("Connection closed, send aborted.");
                 }
 
-                long now = System.currentTimeMillis();
-                if (now >= nextWarn) {
-                    getLog().info("{}: {} (blocking for: {}s)", new Object[]{ usage, warning,
new Long(((now - start) / 1000))});
-                    nextWarn = now + blockedProducerWarningInterval;
+                if (isFlowControlLogRequired()) {
+                    getLog().info("{}: {} (blocking for: {}s)", new Object[]{ usage, warning,
new Long(((System.currentTimeMillis() - start) / 1000))});
                 }
             }
             long finish = System.currentTimeMillis();
@@ -705,6 +702,18 @@ public abstract class BaseDestination implements Destination {
         }
     }
 
+    protected boolean isFlowControlLogRequired() {
+        boolean answer = false;
+        if (blockedProducerWarningInterval > 0) {
+            long now = System.currentTimeMillis();
+            if (lastBlockedProducerWarnTime + blockedProducerWarningInterval <= now) {
+                lastBlockedProducerWarnTime = now;
+                answer = true;
+            }
+        }
+        return answer;
+    }
+
     protected abstract Logger getLog();
 
     public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/01b1f7f6/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 2b5c0c3..3ead89d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -628,12 +628,11 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
             isFull(context, memoryUsage);
             fastProducer(context, producerInfo);
             if (isProducerFlowControl() && context.isProducerFlowControl()) {
-                if (warnOnProducerFlowControl) {
-                    warnOnProducerFlowControl = false;
+                if (isFlowControlLogRequired()) {
                     LOG.info("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers
will be throttled to the rate at which messages are removed from this destination to prevent
flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
-                                    memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(),
destinationStatistics.getMessages().getCount());
-                }
+                                memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(),
destinationStatistics.getMessages().getCount());
 
+                }
                 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace())
{
                     throw new ResourceAllocationException("Usage Manager Memory Limit reached.
Stopping producer ("
                             + message.getProducerId() + ") to prevent flooding "

http://git-wip-us.apache.org/repos/asf/activemq/blob/01b1f7f6/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index c553e8c..8b46475 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -382,8 +382,7 @@ public class Topic extends BaseDestination implements Task {
 
             if (isProducerFlowControl() && context.isProducerFlowControl()) {
 
-                if (warnOnProducerFlowControl) {
-                    warnOnProducerFlowControl = false;
+                if (isFlowControlLogRequired()) {
                     LOG.info("{}, Usage Manager memory limit reached {}. Producers will be
throttled to the rate at which messages are removed from this destination to prevent flooding
it. See http://activemq.apache.org/producer-flow-control.html for more info.",
                             getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/01b1f7f6/activemq-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
index 70e15ec..7cf05d3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
@@ -20,7 +20,9 @@ import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
@@ -31,12 +33,18 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -247,6 +255,44 @@ public class ProducerFlowControlTest extends JmsTestSupport {
         assertFalse(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
     }
 
+    public void testDisableWarning() throws Exception {
+        final AtomicInteger warnings = new AtomicInteger();
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getLevel().equals(Level.INFO) && event.getMessage().toString().contains("Usage
Manager Memory Limit")) {
+                    LOG.info("received  log message: " + event.getMessage());
+                    warnings.incrementAndGet();
+                }
+            }
+        };
+        org.apache.log4j.Logger log4jLogger =
+                org.apache.log4j.Logger.getLogger(Queue.class);
+        log4jLogger.addAppender(appender);
+        try {
+            ConnectionFactory factory = createConnectionFactory();
+            connection = (ActiveMQConnection)factory.createConnection();
+            connections.add(connection);
+            connection.start();
+
+            fillQueue(queueB);
+            assertEquals(1, warnings.get());
+
+            broker.getDestinationPolicy().getDefaultEntry().setBlockedProducerWarningInterval(0);
+            warnings.set(0);
+
+            // new connection b/c other is blocked
+            connection = (ActiveMQConnection)factory.createConnection();
+            connections.add(connection);
+            connection.start();
+            fillQueue(new ActiveMQQueue("SomeOtherQueueToPickUpNewPolicy"));
+            assertEquals(0, warnings.get());
+
+        } finally {
+            log4jLogger.removeAppender(appender);
+        }
+    }
+
     private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException
{
         final AtomicBoolean done = new AtomicBoolean(true);
         final AtomicBoolean keepGoing = new AtomicBoolean(true);
@@ -334,7 +380,9 @@ public class ProducerFlowControlTest extends JmsTestSupport {
     }
     
     protected void tearDown() throws Exception {
-        if (connection != null) {
+        for (Connection c : connections) {
+            // force error on blocked connections
+            ActiveMQConnection connection = (ActiveMQConnection) c;
             TcpTransport t = (TcpTransport)connection.getTransport().narrow(TcpTransport.class);
             t.getTransportListener().onException(new IOException("Disposed."));
             connection.getTransport().stop();

http://git-wip-us.apache.org/repos/asf/activemq/blob/01b1f7f6/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
index 90b6c92..1574ec9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
@@ -33,10 +33,15 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.DefaultTestAppender;
 import org.apache.activemq.util.Wait;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,6 +75,7 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
         tpe.setMemoryLimit(destinationMemLimit);
         tpe.setProducerFlowControl(true);
         tpe.setAdvisoryWhenFull(true);
+        tpe.setBlockedProducerWarningInterval(2000);
 
 
         pm.setPolicyEntries(Arrays.asList(new PolicyEntry[]{tpe}));
@@ -128,49 +134,72 @@ public class TopicProducerFlowControlTest extends TestCase implements
MessageLis
             }
         });
 
-        // Start producing the test messages
-        final Session session = connectionFactory.createConnection().createSession(false,
Session.AUTO_ACKNOWLEDGE);
-        final MessageProducer producer = session.createProducer(destination);
+        final AtomicInteger warnings = new AtomicInteger();
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getLevel().equals(Level.INFO) && event.getMessage().toString().contains("Usage
Manager memory limit reached")) {
+                    LOG.info("received  log message: " + event.getMessage());
+                    warnings.incrementAndGet();
+                }
+            }
+        };
+        org.apache.log4j.Logger log4jLogger =
+                org.apache.log4j.Logger.getLogger(Topic.class);
+        log4jLogger.addAppender(appender);
+        try {
 
-        Thread producingThread = new Thread("Producing Thread") {
-            public void run() {
-                try {
-                    for (long i = 0; i < numMessagesToSend; i++) {
-                        producer.send(session.createTextMessage("test"));
+            // Start producing the test messages
+            final Session session = connectionFactory.createConnection().createSession(false,
Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = session.createProducer(destination);
 
-                        long count = produced.incrementAndGet();
-                        if (count % 10000 == 0) {
-                            LOG.info("Produced " + count + " messages");
-                        }
-                    }
-                } catch (Throwable ex) {
-                    ex.printStackTrace();
-                } finally {
+            Thread producingThread = new Thread("Producing Thread") {
+                public void run() {
                     try {
-                        producer.close();
-                        session.close();
-                    } catch (Exception e) {
+                        for (long i = 0; i < numMessagesToSend; i++) {
+                            producer.send(session.createTextMessage("test"));
+
+                            long count = produced.incrementAndGet();
+                            if (count % 10000 == 0) {
+                                LOG.info("Produced " + count + " messages");
+                            }
+                        }
+                    } catch (Throwable ex) {
+                        ex.printStackTrace();
+                    } finally {
+                        try {
+                            producer.close();
+                            session.close();
+                        } catch (Exception e) {
+                        }
                     }
                 }
-            }
-        };
+            };
 
-        producingThread.start();
+            producingThread.start();
 
-        Wait.waitFor(new Wait.Condition() {
-            public boolean isSatisified() throws Exception {
-                return consumed.get() == numMessagesToSend;
-            }
-        }, 5 * 60 * 1000); // give it plenty of time before failing
+            Wait.waitFor(new Wait.Condition() {
+                public boolean isSatisified() throws Exception {
+                    return consumed.get() == numMessagesToSend;
+                }
+            }, 5 * 60 * 1000); // give it plenty of time before failing
 
-        assertEquals("Didn't produce all messages", numMessagesToSend, produced.get());
-        assertEquals("Didn't consume all messages", numMessagesToSend, consumed.get());
+            assertEquals("Didn't produce all messages", numMessagesToSend, produced.get());
+            assertEquals("Didn't consume all messages", numMessagesToSend, consumed.get());
 
-         assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
-             public boolean isSatisified() throws Exception {
-                 return blockedCounter.get() > 0;
-             }
-         }, 5 * 1000));
+            assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
+                public boolean isSatisified() throws Exception {
+                    return blockedCounter.get() > 0;
+                }
+            }, 5 * 1000));
+
+            LOG.info("BlockedCount: " + blockedCounter.get() + ", Warnings:" + warnings.get());
+            assertTrue("got a few warnings", warnings.get() > 1);
+            assertTrue("warning limited", warnings.get() < blockedCounter.get());
+
+        } finally {
+            log4jLogger.removeAppender(appender);
+        }
     }
 
     protected Destination createDestination(Session listenerSession) throws Exception {


Mime
View raw message