activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1422873 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/ activemq-client/src/main/java/org/apache/activemq/ activemq-core/src/test/java/org/apache/activemq/
Date Mon, 17 Dec 2012 11:48:02 GMT
Author: gtully
Date: Mon Dec 17 11:48:01 2012
New Revision: 1422873

URL: http://svn.apache.org/viewvc?rev=1422873&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4224 - have broker update zero prefetch on consumer
so that it can issue a pull request as appropriate

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1422873&r1=1422872&r2=1422873&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Mon Dec 17 11:48:01 2012
@@ -260,6 +260,10 @@ public class PolicyEntry extends Destina
         
         if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH){
             sub.setPrefetchSize(getQueuePrefetch());
+            if (sub.getPrefetchSize() == 0) {
+                // tell the sub so that it can issue a pull request
+                sub.updateConsumerPrefetch(0);
+            }
         }
         sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
         sub.setUsePrefetchExtension(isUsePrefetchExtension());

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1422873&r1=1422872&r2=1422873&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Mon Dec 17 11:48:01 2012
@@ -824,7 +824,7 @@ public class ActiveMQMessageConsumer imp
      */
     protected void sendPullCommand(long timeout) throws JMSException {
         clearDispatchList();
-        if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
+        if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
             MessagePull messagePull = new MessagePull();
             messagePull.configure(info);
             messagePull.setTimeout(timeout);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java?rev=1422873&r1=1422872&r2=1422873&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
Mon Dec 17 11:48:01 2012
@@ -26,6 +26,10 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.spring.SpringConsumer;
 import org.slf4j.Logger;
@@ -40,6 +44,7 @@ public class ZeroPrefetchConsumerTest ex
 
     protected Connection connection;
     protected Queue queue;
+    protected Queue brokerZeroQueue = new ActiveMQQueue("brokerZeroConfig");
 
     public void testCannotUseMessageListener() throws Exception {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -313,7 +318,31 @@ public class ZeroPrefetchConsumerTest ex
         answer = (TextMessage)consumer.receiveNoWait();
         assertNull("Should have not received a message!", answer);
     }
-    
+
+    // https://issues.apache.org/jira/browse/AMQ-4224
+    public void testBrokerZeroPrefetchConfig() throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = session.createProducer(brokerZeroQueue);
+        producer.send(session.createTextMessage("Msg1"));
+        // now lets receive it
+        MessageConsumer consumer = session.createConsumer(brokerZeroQueue);
+
+        TextMessage answer = (TextMessage)consumer.receive(5000);
+        assertEquals("Should have received a message!", answer.getText(), "Msg1");
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService brokerService = super.createBroker();
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry zeroPrefetchPolicy = new PolicyEntry();
+        zeroPrefetchPolicy.setQueuePrefetch(0);
+        policyMap.put(ActiveMQDestination.transform(brokerZeroQueue), zeroPrefetchPolicy);
+        brokerService.setDestinationPolicy(policyMap);
+        return brokerService;
+    }
+
     protected void setUp() throws Exception {
         bindAddress = "tcp://localhost:0";
         super.setUp();



Mime
View raw message