activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r895977 - in /activemq/branches/activemq-5.3: activemq-core/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/main/java/org/apache/activemq/usage/ activemq-core/src/test/java/org/apache/activemq/ activemq-core/src/test/java...
Date Tue, 05 Jan 2010 10:44:50 GMT
Author: gtully
Date: Tue Jan  5 10:44:45 2010
New Revision: 895977

URL: http://svn.apache.org/viewvc?rev=895977&view=rev
Log:
svn merge -c 895975 https://svn.apache.org/repos/asf/activemq/trunk - resolve https://issues.apache.org/activemq/browse/AMQ-2523
- have Usage use a shared executor with a limited pool of 10 threads and an unbounded queue.
Fix potential dropped dispatch attempts when messages waiting for space fail due to memory
again being full, can result is hung consumer. In this case, a new notification for not full
needs to be registered. Added relevant test case.

Added:
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usage/
      - copied from r895975, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
      - copied unchanged from r895975, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
    activemq/branches/activemq-5.3/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=895977&r1=895976&r2=895977&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Jan  5 10:44:45 2010
@@ -406,8 +406,7 @@
                 }
 
                 // We can avoid blocking due to low usage if the producer is sending
-                // a sync message or
-                // if it is using a producer window
+                // a sync message or if it is using a producer window
                 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired())
{
                     // copy the exchange state since the context will be modified while we
are waiting
                     // for space.
@@ -441,17 +440,14 @@
                                         ExceptionResponse response = new ExceptionResponse(e);
                                         response.setCorrelationId(message.getCommandId());
                                         context.getConnection().dispatchAsync(response);
+                                    } else {
+                                        LOG.debug("unexpected exception on deferred send
of :" + message, e);
                                     }
                                 }
                             }
                         });
 
-                        // If the user manager is not full, then the task will not
-                        // get called..
-                        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask))
{
-                            // so call it directly here.
-                            sendMessagesWaitingForSpaceTask.run();
-                        }
+                        registerCallbackForNotFullNotification();
                         context.setDontSendReponse(true);
                         return;
                     }
@@ -482,6 +478,15 @@
         }
     }
 
+    private void registerCallbackForNotFullNotification() {
+        // If the usage manager is not full, then the task will not
+        // get called..
+        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
+            // so call it directly here.
+            sendMessagesWaitingForSpaceTask.run();
+        }
+    }
+
     void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
throws IOException, Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
         synchronized (sendLock) {
@@ -1069,9 +1074,14 @@
 
             // do early to allow dispatch of these waiting messages
             synchronized (messagesWaitingForSpace) {
-                while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull())
{
-                    Runnable op = messagesWaitingForSpace.removeFirst();
-                    op.run();
+                while (!messagesWaitingForSpace.isEmpty()) {
+                    if (!memoryUsage.isFull()) {
+                        Runnable op = messagesWaitingForSpace.removeFirst();
+                        op.run();
+                    } else {
+                        registerCallbackForNotFullNotification();
+                        break;
+                    }
                 }
             }
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=895977&r1=895976&r2=895977&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
Tue Jan  5 10:44:45 2010
@@ -43,6 +43,7 @@
 public abstract class Usage<T extends Usage> implements Service {
 
     private static final Log LOG = LogFactory.getLog(Usage.class);
+    private static ThreadPoolExecutor executor;
     protected final Object usageMutex = new Object();
     protected int percentUsage;
     protected T parent;
@@ -55,7 +56,7 @@
     private List<T> children = new CopyOnWriteArrayList<T>();
     private final List<Runnable> callbacks = new LinkedList<Runnable>();
     private int pollingTime = 100;
-    private volatile ThreadPoolExecutor executor;
+    
     private AtomicBoolean started=new AtomicBoolean();
 
     public Usage(T parent, String name, float portion) {
@@ -247,28 +248,30 @@
             if (oldPercentUsage >= 100 && newPercentUsage < 100) {
                 synchronized (usageMutex) {
                     usageMutex.notifyAll();
-                    for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator();
iter.hasNext();) {
-                        Runnable callback = iter.next();
-                        getExecutor().execute(callback);
+                    if (!callbacks.isEmpty()) {
+                        for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator();
iter.hasNext();) {
+                            Runnable callback = iter.next();
+                            getExecutor().execute(callback);
+                        }
+                        callbacks.clear();
                     }
-                    callbacks.clear();
                 }
             }
-            // Let the listeners know on a separate thread
-            Runnable listenerNotifier = new Runnable() {
-            
-                public void run() {
-                    for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();)
{
-                        UsageListener l = iter.next();
-                        l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
+            if (!listeners.isEmpty()) {
+                // Let the listeners know on a separate thread
+                Runnable listenerNotifier = new Runnable() {
+                    public void run() {
+                        for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();)
{
+                            UsageListener l = iter.next();
+                            l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
+                        }
                     }
+                };
+                if (started.get()) {
+                    getExecutor().execute(listenerNotifier);
+                } else {
+                    LOG.warn("Not notifying memory usage change to listeners on shutdown");
                 }
-            
-            };
-            if (started.get()) {
-                getExecutor().execute(listenerNotifier);
-            } else {
-                LOG.warn("Not notifying memory usage change to listeners on shutdown");
             }
         }
     }
@@ -299,9 +302,7 @@
             if (parent != null) {
                 parent.removeChild(this);
             }
-            if (this.executor != null){
-                this.executor.shutdownNow();
-            }
+            
             //clear down any callbacks
             synchronized (usageMutex) {
                 usageMutex.notifyAll();
@@ -402,22 +403,17 @@
     }
     
     protected Executor getExecutor() {
-        if (this.executor == null) {
-        	synchronized(usageMutex) {
-        		if (this.executor == null) {
-		            this.executor = new ThreadPoolExecutor(1, 1, 0,
-		                    TimeUnit.NANOSECONDS,
-		                    new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
-		                        public Thread newThread(Runnable runnable) {
-		                            Thread thread = new Thread(runnable, getName()
-		                                    + " Usage Thread Pool");
-		                            thread.setDaemon(true);
-		                            return thread;
-		                        }
-		                    });
-        		}
-        	}
-        }
-        return this.executor;
+        return executor;
     }
+    
+    static {
+        executor = new ThreadPoolExecutor(10, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable, "Usage Async Task");
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
+    }
+
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java?rev=895977&r1=895976&r2=895977&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
Tue Jan  5 10:44:45 2010
@@ -62,6 +62,11 @@
     }
     
     @Override
+    public void testAsyncPubisherRecoverAfterBlock() throws Exception {
+        // sendFail means no flowControllwindow as there is no producer ack, just an exception
+    }
+    
+    @Override
     public void testPubisherRecoverAfterBlock() throws Exception {
         ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
         // with sendFail, there must be no flowControllwindow

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java?rev=895977&r1=895976&r2=895977&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
Tue Jan  5 10:44:45 2010
@@ -37,9 +37,11 @@
 import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public class ProducerFlowControlTest extends JmsTestSupport {
-
+    static final Log LOG = LogFactory.getLog(ProducerFlowControlTest.class);
     ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
     ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
     protected TransportConnector connector;
@@ -80,8 +82,6 @@
 
     public void testPubisherRecoverAfterBlock() throws Exception {
         ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
-        factory.setProducerWindowSize(1024 * 64);
-        factory.setUseAsyncSend(true);
         connection = (ActiveMQConnection)factory.createConnection();
         connections.add(connection);
         connection.start();
@@ -94,12 +94,14 @@
         
    
 		Thread thread = new Thread("Filler") {
+		    int i;
 			@Override
 			public void run() {
                 while (keepGoing.get()) {
                     done.set(false);
                     try {
-						producer.send(session.createTextMessage("Test message"));
+						producer.send(session.createTextMessage("Test message " + ++i));
+						LOG.info("sent: " + i);
 					} catch (JMSException e) {
 					}
                 }
@@ -114,14 +116,63 @@
         TextMessage msg;
         for (int idx = 0; idx < 5; ++idx) {
         	msg = (TextMessage) consumer.receive(1000);
+        	LOG.info("received: " + idx + ", msg: " + msg.getJMSMessageID());
         	msg.acknowledge();
         }
         Thread.sleep(1000);
         keepGoing.set(false);
     	
-		assertFalse(done.get());
+		assertFalse("producer has resumed", done.get());
     }
-    
+
+    public void testAsyncPubisherRecoverAfterBlock() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        factory.setProducerWindowSize(1024 * 5);
+        factory.setUseAsyncSend(true);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageProducer producer = session.createProducer(queueA);
+        
+        final AtomicBoolean done = new AtomicBoolean(true);
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+        
+   
+        Thread thread = new Thread("Filler") {
+            int i;
+            @Override
+            public void run() {
+                while (keepGoing.get()) {
+                    done.set(false);
+                    try {
+                        producer.send(session.createTextMessage("Test message " + ++i));
+                        LOG.info("sent: " + i);
+                    } catch (JMSException e) {
+                    }
+                }
+            }
+        };
+        thread.start();
+        waitForBlockedOrResourceLimit(done);
+
+        // after receiveing messges, producer should continue sending messages 
+        // (done == false)
+        MessageConsumer consumer = session.createConsumer(queueA);
+        TextMessage msg;
+        for (int idx = 0; idx < 5; ++idx) {
+            msg = (TextMessage) consumer.receive(1000);
+            assertNotNull("Got a message", msg);
+            LOG.info("received: " + idx + ", msg: " + msg.getJMSMessageID());
+            msg.acknowledge();
+        }
+        Thread.sleep(1000);
+        keepGoing.set(false);
+        
+        assertFalse("producer has resumed", done.get());
+    }
+
     public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
         ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
         factory.setAlwaysSyncSend(true);

Modified: activemq/branches/activemq-5.3/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java?rev=895977&r1=895976&r2=895977&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java
Tue Jan  5 10:44:45 2010
@@ -36,7 +36,8 @@
         //System.setProperty("javax.net.debug", "ssl,handshake,data,trustmanager");
         super.setUp();
 
-        Thread.sleep(1000);
+        Thread.sleep(2000);
+        Thread.yield();
     }
 
     protected void tearDown() throws Exception {



Mime
View raw message