activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r831377 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
Date Fri, 30 Oct 2009 16:43:05 GMT
Author: gtully
Date: Fri Oct 30 16:43:05 2009
New Revision: 831377

URL: http://svn.apache.org/viewvc?rev=831377&view=rev
Log:
add variant that uses syncSend and verifies a ResourceAllocationException in the sending thread

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java?rev=831377&r1=831376&r2=831377&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
Fri Oct 30 16:43:05 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq;
 
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.ConnectionFactory;
@@ -111,6 +112,50 @@
         keepGoing.set(false);
     }
 
+    public void testPubisherRecoverAfterBlockWithSyncSend() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        factory.setExceptionListener(null);
+        factory.setUseAsyncSend(false);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageProducer producer = session.createProducer(queueA);
+        
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+        final AtomicInteger exceptionCount = new AtomicInteger(0);
+        Thread thread = new Thread("Filler") {
+            @Override
+            public void run() {
+                while (keepGoing.get()) {
+                    try {
+                        producer.send(session.createTextMessage("Test message"));
+                    } catch (JMSException arg0) {
+                        if (arg0 instanceof ResourceAllocationException) {
+                            gotResourceException.set(true);
+                            exceptionCount.incrementAndGet();
+                        }
+                    }
+                }
+            }
+        };
+        thread.start();
+        waitForBlockedOrResourceLimit(new AtomicBoolean(false));
+
+        // resourceException on second message, resumption if we
+        // can receive 10
+        MessageConsumer consumer = session.createConsumer(queueA);
+        TextMessage msg;
+        for (int idx = 0; idx < 10; ++idx) {
+            msg = (TextMessage) consumer.receive(1000);
+            if (msg != null) {
+                msg.acknowledge();
+            }
+        }
+        assertTrue("we were blocked at least 5 times", 5 < exceptionCount.get());
+        keepGoing.set(false);
+    }
     
 	@Override
 	protected ConnectionFactory createConnectionFactory() throws Exception {



Mime
View raw message