activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5476 - ZeroPrefetchConsumerTest regression - fix default in connection factory and refactor prefetchExtension support - https://issues.apache.org/activemq/browse/AMQ-2560
Date Mon, 15 Dec 2014 16:37:16 GMT
https://issues.apache.org/jira/browse/AMQ-5476 - ZeroPrefetchConsumerTest regression - fix
default in connection factory and refactor prefetchExtension support -  https://issues.apache.org/activemq/browse/AMQ-2560


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2d9959a6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2d9959a6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2d9959a6

Branch: refs/heads/trunk
Commit: 2d9959a6f6f33f7138606073e425a74261ec3125
Parents: 411c754
Author: gtully <gary.tully@gmail.com>
Authored: Mon Dec 15 14:12:08 2014 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Mon Dec 15 14:21:47 2014 +0000

----------------------------------------------------------------------
 .../broker/region/PrefetchSubscription.java     | 60 +++++++++-----------
 .../activemq/ActiveMQConnectionFactory.java     |  2 +-
 .../activemq/ZeroPrefetchConsumerTest.java      | 14 +++--
 3 files changed, 38 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2d9959a6/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 8b8a788..b101d72 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -234,26 +234,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                         index++;
                         acknowledge(context, ack, node);
                         if (ack.getLastMessageId().equals(messageId)) {
-                            // contract prefetch if dispatch required a pull
-                            if (getPrefetchSize() == 0) {
-                                // Protect extension update against parallel updates.
-                                while (true) {
-                                    int currentExtension = prefetchExtension.get();
-                                    int newExtension = Math.max(0, currentExtension - index);
-                                    if (prefetchExtension.compareAndSet(currentExtension,
newExtension)) {
-                                        break;
-                                    }
-                                }
-                            } else if (usePrefetchExtension && context.isInTransaction())
{
-                                // extend prefetch window only if not a pulling consumer
-                                while (true) {
-                                    int currentExtension = prefetchExtension.get();
-                                    int newExtension = Math.max(currentExtension, index);
-                                    if (prefetchExtension.compareAndSet(currentExtension,
newExtension)) {
-                                        break;
-                                    }
-                                }
-                            }
                             destination = (Destination) node.getRegionDestination();
                             callDispatchMatched = true;
                             break;
@@ -283,14 +263,17 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                             registerRemoveSync(context, node);
                         }
 
-                        // Protect extension update against parallel updates.
-                        while (true) {
-                            int currentExtension = prefetchExtension.get();
-                            int newExtension = Math.max(0, currentExtension - 1);
-                            if (prefetchExtension.compareAndSet(currentExtension, newExtension))
{
-                                break;
+                        if (usePrefetchExtension && getPrefetchSize() != 0 &&
ack.isInTransaction()) {
+                            // allow transaction batch to exceed prefetch
+                            while (true) {
+                                int currentExtension = prefetchExtension.get();
+                                int newExtension = Math.max(currentExtension, currentExtension
+ 1);
+                                if (prefetchExtension.compareAndSet(currentExtension, newExtension))
{
+                                    break;
+                                }
                             }
                         }
+
                         acknowledge(context, ack, node);
                         destination = (Destination) node.getRegionDestination();
                         callDispatchMatched = true;
@@ -313,7 +296,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                         nodeDest.getDestinationStatistics().getInflight().decrement();
                     }
                     if (ack.getLastMessageId().equals(node.getMessageId())) {
-                        if (usePrefetchExtension) {
+                        if (usePrefetchExtension && getPrefetchSize() != 0) {
+                            // allow  batch to exceed prefetch
                             while (true) {
                                 int currentExtension = prefetchExtension.get();
                                 int newExtension = Math.max(currentExtension, index + 1);
@@ -426,6 +410,19 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                 new Synchronization() {
 
                     @Override
+                    public void beforeEnd() {
+                        if (usePrefetchExtension && getPrefetchSize() != 0) {
+                            while (true) {
+                                int currentExtension = prefetchExtension.get();
+                                int newExtension = Math.max(0, currentExtension - 1);
+                                if (prefetchExtension.compareAndSet(currentExtension, newExtension))
{
+                                    break;
+                                }
+                            }
+                        }
+                    }
+
+                    @Override
                     public void afterCommit()
                             throws Exception {
                         Destination nodeDest = (Destination) node.getRegionDestination();
@@ -516,7 +513,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
      */
     @Override
     public boolean isFull() {
-        return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
+        return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : dispatched.size()
- prefetchExtension.get() >= info.getPrefetchSize();
     }
 
     /**
@@ -537,7 +534,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
 
     @Override
     public int countBeforeFull() {
-        return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
+        return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize()
+ prefetchExtension.get() - dispatched.size();
     }
 
     @Override
@@ -696,13 +693,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
 
         okForAckAsDispatchDone.countDown();
 
-        // No reentrant lock - Patch needed to IndirectMessageReference on method lock
         MessageDispatch md = createMessageDispatch(node, message);
-        // NULL messages don't count... they don't get Acked.
         if (node != QueueMessageReference.NULL_MESSAGE) {
             dispatchCounter++;
             dispatched.add(node);
-        } else {
+        }
+        if (getPrefetchSize() == 0) {
             while (true) {
                 int currentExtension = prefetchExtension.get();
                 int newExtension = Math.max(0, currentExtension - 1);

http://git-wip-us.apache.org/repos/asf/activemq/blob/2d9959a6/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
index 3354ab3..1fbf604 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -173,7 +173,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements
Conne
     private long consumerFailoverRedeliveryWaitPeriod = 0;
     private boolean checkForDuplicates = true;
     private ClientInternalExceptionListener clientInternalExceptionListener;
-    private boolean messagePrioritySupported = true;
+    private boolean messagePrioritySupported = false;
     private boolean transactedIndividualAck = false;
     private boolean nonBlockingRedelivery = false;
     private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;

http://git-wip-us.apache.org/repos/asf/activemq/blob/2d9959a6/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
index acf9c03..d4cecab 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
@@ -174,7 +174,7 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport
{
     }
 
     private void doTestManyMessageConsumer(boolean transacted) throws Exception {
-        Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+        Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED
: Session.AUTO_ACKNOWLEDGE);
 
         MessageProducer producer = session.createProducer(queue);
         producer.send(session.createTextMessage("Msg1"));
@@ -221,12 +221,11 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport
{
             session.commit();
         }
         // Now using other consumer
-        // this call should return the next message (Msg5) still left on the queue
+        // this call should return the next message still left on the queue
         answer = (TextMessage)consumer.receive(5000);
         assertEquals("Should have received a message!", answer.getText(), "Msg6");
         // read one more message without commit
-        // Now using other consumer
-        // this call should return the next message (Msg5) still left on the queue
+        // this call should return the next message still left on the queue
         answer = (TextMessage)consumer.receive(5000);
         assertEquals("Should have received a message!", answer.getText(), "Msg7");
         if (transacted) {
@@ -247,12 +246,17 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport
{
         doTestManyMessageConsumerWithSend(true);
     }
 
+    public void testManyMessageConsumerWithTxSendPrioritySupport() throws Exception {
+        ((ActiveMQConnection)connection).setMessagePrioritySupported(true);
+        doTestManyMessageConsumerWithSend(true);
+    }
+
     public void testManyMessageConsumerWithSendNoTransaction() throws Exception {
         doTestManyMessageConsumerWithSend(false);
     }
 
     private void doTestManyMessageConsumerWithSend(boolean transacted) throws Exception {
-        Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+        Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED
:Session.AUTO_ACKNOWLEDGE);
 
         MessageProducer producer = session.createProducer(queue);
         producer.send(session.createTextMessage("Msg1"));


Mime
View raw message