activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1379433 - in /activemq/trunk/activemq-ra/src: main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java test/java/org/apache/activemq/ra/MDBTest.java
Date Fri, 31 Aug 2012 12:53:54 GMT
Author: dejanb
Date: Fri Aug 31 12:53:53 2012
New Revision: 1379433

URL: http://svn.apache.org/viewvc?rev=1379433&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3986 - respect prefetch values for MDBs

Modified:
    activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
    activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java

Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java?rev=1379433&r1=1379432&r2=1379433&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
(original)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
Fri Aug 31 12:53:53 2012
@@ -21,6 +21,7 @@ import java.lang.reflect.Method;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.Connection;
 import javax.jms.ConnectionConsumer;
+import javax.jms.Destination;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -97,67 +98,76 @@ public class ActiveMQEndpointWorker {
             public void run() {
                 currentReconnectDelay = INITIAL_RECONNECT_DELAY;
                 MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
-                if ( LOG.isInfoEnabled() ) {
+                if (LOG.isInfoEnabled()) {
                     LOG.info("Establishing connection to broker [" + adapter.getInfo().getServerUrl()
+ "]");
                 }
 
-                while ( connecting.get() && running ) {
-                try {
-                    connection = adapter.makeConnection(activationSpec);
-                    connection.setExceptionListener(new ExceptionListener() {
-                        public void onException(JMSException error) {
-                            if (!serverSessionPool.isClosing()) {
+                while (connecting.get() && running) {
+                    try {
+                        connection = adapter.makeConnection(activationSpec);
+                        connection.setExceptionListener(new ExceptionListener() {
+                            public void onException(JMSException error) {
+                                if (!serverSessionPool.isClosing()) {
                                     // initiate reconnection only once, i.e. on initial exception
                                     // and only if not already trying to connect
                                     LOG.error("Connection to broker failed: " + error.getMessage(),
error);
-                                    if ( connecting.compareAndSet(false, true) ) {
-                                        synchronized ( connectWork ) {
+                                    if (connecting.compareAndSet(false, true)) {
+                                        synchronized (connectWork) {
                                             disconnect();
                                             serverSessionPool.closeIdleSessions();
                                             connect();
-                            }
+                                        }
                                     } else {
                                         // connection attempt has already been initiated
                                         LOG.info("Connection attempt already in progress,
ignoring connection exception");
-                        }
+                                    }
                                 }
                             }
-                    });
+                        });
                         connection.start();
 
-                        int prefetchSize = activationSpec.getMaxMessagesPerSessionsIntValue()
* activationSpec.getMaxSessionsIntValue();
-                    if (activationSpec.isDurableSubscription()) {
+                        if (activationSpec.isDurableSubscription()) {
                             consumer = connection.createDurableConnectionConsumer(
                                     (Topic) dest,
-                                    activationSpec.getSubscriptionName(), 
+                                    activationSpec.getSubscriptionName(),
                                     emptyToNull(activationSpec.getMessageSelector()),
-                                    serverSessionPool, 
-                                    prefetchSize,
+                                    serverSessionPool,
+                                    connection.getPrefetchPolicy().getDurableTopicPrefetch(),
                                     activationSpec.getNoLocalBooleanValue());
-                    } else {
+                        } else {
                             consumer = connection.createConnectionConsumer(
-                                    dest, 
-                                    emptyToNull(activationSpec.getMessageSelector()), 
-                                    serverSessionPool, 
-                                    prefetchSize,
-                                                                       activationSpec.getNoLocalBooleanValue());
-                    }
+                                    dest,
+                                    emptyToNull(activationSpec.getMessageSelector()),
+                                    serverSessionPool,
+                                    getPrefetch(activationSpec, connection, dest),
+                                    activationSpec.getNoLocalBooleanValue());
+                        }
 
 
-                        if ( connecting.compareAndSet(true, false) ) {
-                            if ( LOG.isInfoEnabled() ) {
+                        if (connecting.compareAndSet(true, false)) {
+                            if (LOG.isInfoEnabled()) {
                                 LOG.info("Successfully established connection to broker ["
+ adapter.getInfo().getServerUrl() + "]");
                             }
                         } else {
                             LOG.error("Could not release connection lock");
                         }
-                } catch (JMSException error) {
-                        if ( LOG.isDebugEnabled() ) {
+                    } catch (JMSException error) {
+                        if (LOG.isDebugEnabled()) {
                             LOG.debug("Failed to connect: " + error.getMessage(), error);
-                }
+                        }
                         disconnect();
                         pause(error);
+                    }
+                }
             }
+
+            private int getPrefetch(MessageActivationSpec activationSpec, ActiveMQConnection
connection, ActiveMQDestination destination) {
+                if (destination.isTopic()) {
+                    return connection.getPrefetchPolicy().getTopicPrefetch();
+                } else if (destination.isQueue()) {
+                    return connection.getPrefetchPolicy().getQueuePrefetch();
+                } else {
+                    return activationSpec.getMaxMessagesPerSessionsIntValue() * activationSpec.getMaxSessionsIntValue();
                 }
             }
             

Modified: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java?rev=1379433&r1=1379432&r2=1379433&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java (original)
+++ activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java Fri Aug 31
12:53:53 2012
@@ -48,7 +48,12 @@ import javax.transaction.xa.Xid;
 
 import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerInfo;
 
 public class MDBTest extends TestCase {
 
@@ -133,10 +138,14 @@ public class MDBTest extends TestCase {
 
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
         Connection connection = factory.createConnection();
+        connection.start();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
+        MessageConsumer advisory = session.createConsumer(AdvisorySupport.getConsumerAdvisoryTopic(new
ActiveMQQueue("TEST")));
+
         ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
         adapter.setServerUrl("vm://localhost?broker.persistent=false");
+        adapter.setQueuePrefetch(1);
         adapter.start(new StubBootstrapContext());
 
         final CountDownLatch messageDelivered = new CountDownLatch(1);
@@ -168,16 +177,17 @@ public class MDBTest extends TestCase {
         // Activate an Endpoint
         adapter.endpointActivation(messageEndpointFactory, activationSpec);
 
-        // Give endpoint a chance to setup and register its listeners
-        try {
-            Thread.sleep(1000);
-        } catch (Exception e) {
-
+        ActiveMQMessage msg = (ActiveMQMessage)advisory.receive(1000);
+        if (msg != null) {
+            assertEquals("Prefetch size hasn't been set", 1, ((ConsumerInfo)msg.getDataStructure()).getPrefetchSize());
+        } else {
+            fail("Consumer hasn't been created");
         }
 
         // Send the broker a message to that endpoint
         MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
         producer.send(session.createTextMessage("Hello!"));
+
         connection.close();
 
         // Wait for the message to be delivered.



Mime
View raw message