activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1422928 - in /activemq/trunk: activemq-client/src/main/java/org/apache/activemq/ activemq-ra/src/main/java/org/apache/activemq/ra/ activemq-ra/src/test/java/org/apache/activemq/ra/
Date Mon, 17 Dec 2012 13:43:15 GMT
Author: gtully
Date: Mon Dec 17 13:43:14 2012
New Revision: 1422928

URL: http://svn.apache.org/viewvc?rev=1422928&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4226 - ensure potential error condition (starved
rar endpoint listener) is flagged in log

Modified:
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
    activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
    activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
    activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
    activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/ManagedConnectionFactoryTest.java

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1422928&r1=1422927&r2=1422928&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
Mon Dec 17 13:43:14 2012
@@ -2337,6 +2337,12 @@ public class ActiveMQConnection implemen
             for (ActiveMQSession session : this.sessions) {
                 session.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
             }
+            for (ActiveMQConnectionConsumer connectionConsumer: connectionConsumers) {
+                ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
+                if (consumerInfo.getConsumerId().equals(command.getConsumerId())) {
+                    consumerInfo.setPrefetchSize(command.getPrefetch());
+                }
+            }
         }
     }
 

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java?rev=1422928&r1=1422927&r2=1422928&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
(original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
Mon Dec 17 13:43:14 2012
@@ -28,6 +28,7 @@ import javax.jms.ServerSession;
 import javax.jms.ServerSessionPool;
 import javax.jms.Session;
 
+import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.MessageDispatch;
 
@@ -76,7 +77,7 @@ public class ActiveMQConnectionConsumer 
 
         this.connection.addConnectionConsumer(this);
         this.connection.addDispatcher(consumerInfo.getConsumerId(), this);
-        this.connection.asyncSendPacket(this.consumerInfo);
+        this.connection.syncSendPacket(this.consumerInfo);
     }
 
     /**
@@ -160,4 +161,8 @@ public class ActiveMQConnectionConsumer 
         // Till there is a need, lets immediately allow dispatch
         this.connection.transportInterruptionProcessingComplete();
     }
+
+    public ConsumerInfo getConsumerInfo() {
+        return consumerInfo;
+    }
 }

Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java?rev=1422928&r1=1422927&r2=1422928&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
(original)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
Mon Dec 17 13:43:14 2012
@@ -35,6 +35,7 @@ import javax.resource.spi.work.WorkExcep
 import javax.resource.spi.work.WorkManager;
 
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionConsumer;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -74,7 +75,7 @@ public class ActiveMQEndpointWorker {
     private final Object shutdownMutex = new String("shutdownMutex");
     
     private ActiveMQConnection connection;
-    private ConnectionConsumer consumer;
+    private ActiveMQConnectionConsumer consumer;
     private ServerSessionPoolImpl serverSessionPool;
     private boolean running;
 
@@ -127,7 +128,7 @@ public class ActiveMQEndpointWorker {
                         connection.start();
 
                         if (activationSpec.isDurableSubscription()) {
-                            consumer = connection.createDurableConnectionConsumer(
+                            consumer = (ActiveMQConnectionConsumer) connection.createDurableConnectionConsumer(
                                     (Topic) dest,
                                     activationSpec.getSubscriptionName(),
                                     emptyToNull(activationSpec.getMessageSelector()),
@@ -135,7 +136,7 @@ public class ActiveMQEndpointWorker {
                                     connection.getPrefetchPolicy().getDurableTopicPrefetch(),
                                     activationSpec.getNoLocalBooleanValue());
                         } else {
-                            consumer = connection.createConnectionConsumer(
+                            consumer = (ActiveMQConnectionConsumer) connection.createConnectionConsumer(
                                     dest,
                                     emptyToNull(activationSpec.getMessageSelector()),
                                     serverSessionPool,
@@ -151,6 +152,11 @@ public class ActiveMQEndpointWorker {
                         } else {
                             LOG.error("Could not release connection lock");
                         }
+
+                        if (consumer.getConsumerInfo().getCurrentPrefetchSize() == 0) {
+                            LOG.error("Endpoint " + endpointActivationKey.getActivationSpec()
+ " will not receive any messages due to broker 'zero prefetch' configuration for: " + dest);
+                        }
+
                     } catch (JMSException error) {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Failed to connect: " + error.getMessage(), error);

Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java?rev=1422928&r1=1422927&r2=1422928&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
(original)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
Mon Dec 17 13:43:14 2012
@@ -181,7 +181,7 @@ public class ActiveMQResourceAdapter ext
         }
 
         if (!(activationSpec instanceof MessageActivationSpec)) {
-            throw new NotSupportedException("That type of ActicationSpec not supported: "
+ activationSpec.getClass());
+            throw new NotSupportedException("That type of ActivationSpec not supported: "
+ activationSpec.getClass());
         }
 
         ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory,
(MessageActivationSpec)activationSpec);

Modified: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java?rev=1422928&r1=1422927&r2=1422928&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java (original)
+++ activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java Mon Dec 17
13:43:14 2012
@@ -23,6 +23,7 @@ import java.lang.reflect.Method;
 import java.util.Timer;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -49,9 +50,19 @@ import javax.transaction.xa.Xid;
 import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Layout;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.spi.ErrorHandler;
+import org.apache.log4j.spi.Filter;
+import org.apache.log4j.spi.LoggingEvent;
 
 public class MDBTest extends TestCase {
 
@@ -197,6 +208,147 @@ public class MDBTest extends TestCase {
 
     }
 
+    public void testErrorOnNoMessageDeliveryBrokerZeroPrefetchConfig() throws Exception {
+
+        final BrokerService brokerService = new BrokerService();
+        final String brokerUrl = "vm://zeroPrefetch?create=false";
+        brokerService.setBrokerName("zeroPrefetch");
+        brokerService.setPersistent(false);
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry zeroPrefetchPolicy = new PolicyEntry();
+        zeroPrefetchPolicy.setQueuePrefetch(0);
+        policyMap.setDefaultEntry(zeroPrefetchPolicy);
+        brokerService.setDestinationPolicy(policyMap);
+        brokerService.start();
+
+        final AtomicReference<String> errorMessage = new AtomicReference<String>();
+        final Appender testAppender = new Appender() {
+
+            @Override
+            public void addFilter(Filter filter) {
+            }
+
+            @Override
+            public Filter getFilter() {
+                return null;
+            }
+
+            @Override
+            public void clearFilters() {
+            }
+
+            @Override
+            public void close() {
+            }
+
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getLevel().isGreaterOrEqual(Level.ERROR)) {
+                    errorMessage.set(event.getRenderedMessage());
+                }
+            }
+
+            @Override
+            public String getName() {
+                return null;
+            }
+
+            @Override
+            public void setErrorHandler(ErrorHandler errorHandler) {
+            }
+
+            @Override
+            public ErrorHandler getErrorHandler() {
+                return null;
+            }
+
+            @Override
+            public void setLayout(Layout layout) {
+            }
+
+            @Override
+            public Layout getLayout() {
+                return null;
+            }
+
+            @Override
+            public void setName(String s) {
+            }
+
+            @Override
+            public boolean requiresLayout() {
+                return false;
+            }
+        };
+        LogManager.getRootLogger().addAppender(testAppender);
+
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer advisory = session.createConsumer(AdvisorySupport.getConsumerAdvisoryTopic(new
ActiveMQQueue("TEST")));
+
+        ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
+        adapter.setServerUrl(brokerUrl);
+        adapter.start(new StubBootstrapContext());
+
+        final CountDownLatch messageDelivered = new CountDownLatch(1);
+
+        final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
+            public void onMessage(Message message) {
+                super.onMessage(message);
+                messageDelivered.countDown();
+            };
+        };
+
+        ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
+        activationSpec.setDestinationType(Queue.class.getName());
+        activationSpec.setDestination("TEST");
+        activationSpec.setResourceAdapter(adapter);
+        activationSpec.validate();
+
+        MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
+            public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException
{
+                endpoint.xaresource = resource;
+                return endpoint;
+            }
+
+            public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException
{
+                return true;
+            }
+        };
+
+        // Activate an Endpoint
+        adapter.endpointActivation(messageEndpointFactory, activationSpec);
+
+        ActiveMQMessage msg = (ActiveMQMessage)advisory.receive(1000);
+        if (msg != null) {
+            assertEquals("Prefetch size hasn't been set", 0, ((ConsumerInfo)msg.getDataStructure()).getPrefetchSize());
+        } else {
+            fail("Consumer hasn't been created");
+        }
+
+        // Send the broker a message to that endpoint
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
+        producer.send(session.createTextMessage("Hello!"));
+
+        connection.close();
+
+        // Wait for the message to be delivered.
+        assertFalse(messageDelivered.await(5000, TimeUnit.MILLISECONDS));
+
+        // Shut the Endpoint down.
+        adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
+        adapter.stop();
+
+        assertNotNull("We got an error message", errorMessage.get());
+        assertTrue("correct message", errorMessage.get().contains("zero"));
+
+        LogManager.getRootLogger().removeAppender(testAppender);
+        brokerService.stop();
+    }
 
     public void testMessageExceptionReDelivery() throws Exception {
 

Modified: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/ManagedConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/ManagedConnectionFactoryTest.java?rev=1422928&r1=1422927&r2=1422928&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/ManagedConnectionFactoryTest.java
(original)
+++ activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/ManagedConnectionFactoryTest.java
Mon Dec 17 13:43:14 2012
@@ -88,6 +88,8 @@ public class ManagedConnectionFactoryTes
         assertTrue(connection != null);
         assertTrue(connection instanceof ManagedConnectionProxy);
 
+        connection.close();
+
     }
 
     public void testConnectionFactoryConnectionMatching() throws ResourceException, JMSException
{
@@ -123,6 +125,9 @@ public class ManagedConnectionFactoryTes
         test = managedConnectionFactory.matchManagedConnections(set, null, ri2);
         assertTrue(connection2 == test);
 
+        for (ManagedConnection managedConnection : set) {
+            managedConnection.destroy();
+        }
     }
 
     public void testConnectionFactoryIsSerializableAndReferenceable() throws ResourceException,
JMSException {



Mime
View raw message