activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r935954 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ test/java/org/apache/activemq/
Date Tue, 20 Apr 2010 15:13:18 GMT
Author: gtully
Date: Tue Apr 20 15:13:18 2010
New Revision: 935954

URL: http://svn.apache.org/viewvc?rev=935954&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2651 - modified patch applied with thanks,
did not change the default as there are a bunch of tests and user applications that depend
on the current default

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=935954&r1=935953&r2=935954&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Apr 20 15:13:18 2010
@@ -60,6 +60,7 @@ public abstract class PrefetchSubscripti
     protected PendingMessageCursor pending;
     protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
     protected int prefetchExtension;
+    protected boolean usePrefetchExtension = true;
     protected long enqueueCounter;
     protected long dispatchCounter;
     protected long dequeueCounter;
@@ -257,7 +258,7 @@ public abstract class PrefetchSubscripti
                             // contract prefetch if dispatch required a pull
                             if (getPrefetchSize() == 0) {
                                 prefetchExtension = Math.max(0, prefetchExtension - index);
-                            } else if (context.isInTransaction()) {
+                            } else if (usePrefetchExtension && context.isInTransaction())
{
                                 // extend prefetch window only if not a pulling consumer
                                 prefetchExtension = Math.max(prefetchExtension, index);
                             }
@@ -307,7 +308,9 @@ public abstract class PrefetchSubscripti
                         node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
                     }
                     if (ack.getLastMessageId().equals(node.getMessageId())) {
-                        prefetchExtension = Math.max(prefetchExtension, index + 1);
+                        if (usePrefetchExtension) {
+                            prefetchExtension = Math.max(prefetchExtension, index + 1);
+                        }
                         destination = node.getRegionDestination();
                         callDispatchMatched = true;
                         break;
@@ -746,4 +749,12 @@ public abstract class PrefetchSubscripti
     public void setMaxAuditDepth(int maxAuditDepth) {
         this.maxAuditDepth = maxAuditDepth;
     }
+    
+    public boolean isUsePrefetchExtension() {
+        return usePrefetchExtension;
+    }
+
+    public void setUsePrefetchExtension(boolean usePrefetchExtension) {
+        this.usePrefetchExtension = usePrefetchExtension;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=935954&r1=935953&r2=935954&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Tue Apr 20 15:13:18 2010
@@ -83,7 +83,8 @@ public class PolicyEntry extends Destina
     private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH;
     private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
     private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
-    private int cursorMemoryHighWaterMark=70;
+    private boolean usePrefetchExtension = true;
+    private int cursorMemoryHighWaterMark = 70;
     private int storeUsageHighWaterMark = 100;
     
    
@@ -195,7 +196,7 @@ public class PolicyEntry extends Destina
         }
         sub.setMaxAuditDepth(getMaxAuditDepth());
         sub.setMaxProducersToAudit(getMaxProducersToAudit());
-        
+        sub.setUsePrefetchExtension(isUsePrefetchExtension());        
     }
     
     public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription
sub) {
@@ -207,6 +208,7 @@ public class PolicyEntry extends Destina
             sub.setPrefetchSize(getQueueBrowserPrefetch());
         }
         sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
+        sub.setUsePrefetchExtension(isUsePrefetchExtension());
     }
     
     public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub)
{
@@ -218,6 +220,7 @@ public class PolicyEntry extends Destina
             sub.setPrefetchSize(getQueuePrefetch());
         }
         sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
+        sub.setUsePrefetchExtension(isUsePrefetchExtension());
     }
 
     // Properties
@@ -692,12 +695,20 @@ public class PolicyEntry extends Destina
         this.durableTopicPrefetch = durableTopicPrefetch;
     }
     
+    public boolean isUsePrefetchExtension() {
+        return this.usePrefetchExtension;
+    }
+
+    public void setUsePrefetchExtension(boolean usePrefetchExtension) {
+        this.usePrefetchExtension = usePrefetchExtension;
+    }
+    
     public int getCursorMemoryHighWaterMark() {
-		return this.cursorMemoryHighWaterMark;
-	}
+        return this.cursorMemoryHighWaterMark;
+    }
 
-	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
-		this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
+    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
+        this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
 	}
 
     public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java?rev=935954&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
Tue Apr 20 15:13:18 2010
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+// see: https://issues.apache.org/activemq/browse/AMQ-2651
+public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport {
+    private static final Log LOG = LogFactory.getLog(OnePrefetchAsyncConsumerTest.class);
+
+    private TestMutex testMutex;
+    protected Connection connection;
+    protected ConnectionConsumer connectionConsumer;
+    protected Queue queue;
+    protected CountDownLatch messageTwoDelay = new CountDownLatch(1);
+
+    public void testPrefetchExtension() throws Exception {
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(queue);
+
+        // when Msg1 is acked, the PrefetchSubscription will (incorrectly?) increment its
prefetchExtension
+        producer.send(session.createTextMessage("Msg1"));
+
+        // Msg2 will exhaust the ServerSessionPool (since it only has 1 ServerSession)
+        producer.send(session.createTextMessage("Msg2"));
+
+        // Msg3 will cause the test to fail as it will attempt to retrieve an additional
ServerSession from
+        // an exhausted ServerSessionPool due to the (incorrectly?) incremented prefetchExtension
in the PrefetchSubscription
+        producer.send(session.createTextMessage("Msg3"));
+        
+        session.commit();
+        
+        // wait for test to complete and the test result to get set
+        // this happens asynchronously since the messages are delivered asynchronously
+        synchronized (testMutex) {
+           while (!testMutex.testCompleted) {
+              testMutex.wait();
+           }
+        }
+        
+        //test completed, result is ready
+        assertTrue("Attempted to retrieve more than one ServerSession at a time", testMutex.testSuccessful);
+    }
+
+    protected void setUp() throws Exception {
+        bindAddress = "tcp://localhost:61616";
+        super.setUp();
+
+        testMutex = new TestMutex();
+        connection = createConnection();
+        queue = createQueue();
+        // note the last arg of 1, this becomes the prefetchSize in PrefetchSubscription
+        connectionConsumer = connection.createConnectionConsumer(
+           queue, null, new TestServerSessionPool(connection), 1);
+        connection.start();
+    }
+
+    protected void tearDown() throws Exception {
+        connectionConsumer.close();
+        connection.close();
+        super.tearDown();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = super.createBroker();
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        // ensure prefetch is exact. only delivery next when current is acked
+        defaultEntry.setUsePrefetchExtension(false);
+        policyMap.setDefaultEntry(defaultEntry);
+        answer.setDestinationPolicy(policyMap);
+        return answer;
+    }
+    
+    protected Queue createQueue() {
+        return new ActiveMQQueue(getDestinationString());
+    }
+
+    // simulates a ServerSessionPool with only 1 ServerSession
+    private class TestServerSessionPool implements ServerSessionPool {
+         Connection connection;
+         TestServerSession serverSession;
+         boolean serverSessionInUse = false;
+
+         public TestServerSessionPool(Connection connection) throws JMSException {
+             this.connection = connection;
+             serverSession = new TestServerSession(this);
+         }
+
+         public ServerSession getServerSession() throws JMSException {
+             synchronized (this) {
+                 if (serverSessionInUse) {
+                     LOG.info("asked for session while in use, not serialised delivery");
+                     synchronized (testMutex) {
+                        testMutex.testSuccessful = false;
+                        testMutex.testCompleted = true;
+                     }
+                 }
+                 serverSessionInUse = true;
+                 return serverSession;
+             }
+         }
+    }
+
+    private class TestServerSession implements ServerSession {
+         TestServerSessionPool pool;
+         Session session;
+
+         public TestServerSession(TestServerSessionPool pool) throws JMSException {
+             this.pool = pool;
+             session = pool.connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+             session.setMessageListener(new TestMessageListener());
+         }
+
+         public Session getSession() throws JMSException {
+             return session;
+         }
+
+         public void start() throws JMSException {
+             // use a separate thread to process the message asynchronously
+             new Thread() {
+                 public void run() {
+                     // let the session deliver the message
+                     session.run();
+
+                     // commit the tx
+                     try {
+                         session.commit();
+                     }
+                     catch (JMSException e) {
+                     }
+
+                     // return ServerSession to pool
+                     synchronized (pool) {
+                         pool.serverSessionInUse = false;
+                     }
+
+                     // let the test check if the test was completed
+                     synchronized (testMutex) {
+                         testMutex.notify();
+                     }
+                 }
+              }.start();
+         }
+    }
+
+    private class TestMessageListener implements MessageListener {
+        public void onMessage(Message message) {
+            try {
+               String text = ((TextMessage)message).getText();
+               LOG.info("got message: " + text);
+               if (text.equals("Msg3")) {
+                  // if we get here, Exception in getServerSession() was not thrown, test
is successful
+                  // this obviously doesn't happen now,
+                  // need to fix prefetchExtension computation logic in PrefetchSubscription
to get here
+                  synchronized (testMutex) {
+                      if (!testMutex.testCompleted) {
+                          testMutex.testSuccessful = true;
+                          testMutex.testCompleted = true;
+                      }
+                  }
+               }
+               else if (text.equals("Msg2")) {
+                  // simulate long message processing so that Msg3 comes when Msg2 is still
being processed
+                  // and thus the single ServerSession is in use
+                  TimeUnit.SECONDS.sleep(4);
+               }
+            }
+            catch (JMSException e) {
+            }
+            catch (InterruptedException e) {
+            }
+        }
+    }
+
+    private class TestMutex {
+        boolean testCompleted = false;
+        boolean testSuccessful = true;
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message