activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r900318 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ActiveMQMessageConsumer.java main/java/org/apache/activemq/broker/region/Queue.java test/java/org/apache/activemq/bugs/AMQ2102Test.java
Date Mon, 18 Jan 2010 09:43:22 GMT
Author: gtully
Date: Mon Jan 18 09:43:21 2010
New Revision: 900318

URL: http://svn.apache.org/viewvc?rev=900318&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2572 - issue with unconsumed prefetched
messages being ignored as duplicates - timing issue with consumer close and session dispatch

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=900318&r1=900317&r2=900318&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Mon Jan 18 09:43:21 2010
@@ -729,6 +729,8 @@
                     deliveredMessages.clear();
                 }
             }
+            unconsumedMessages.close();
+            this.session.removeConsumer(this);
             List<MessageDispatch> list = unconsumedMessages.removeAll();
             if (!this.info.isBrowser()) {
                 for (MessageDispatch old : list) {
@@ -736,8 +738,6 @@
                     session.connection.rollbackDuplicate(this, old.getMessage());
                 }
             }
-            unconsumedMessages.close();
-            this.session.removeConsumer(this);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=900318&r1=900317&r2=900318&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Jan 18 09:43:21 2010
@@ -1160,7 +1160,10 @@
                     LOG.error("Failed to page in more queue messages ", e);
                 }
             }
-            return pendingWakeups.decrementAndGet() > 0;
+            if (pendingWakeups.get() > 0) {
+                pendingWakeups.decrementAndGet();
+            }
+            return pendingWakeups.get() > 0;
         }
     }
 
@@ -1333,7 +1336,7 @@
             int toPageIn = Math.min(getMaxPageSize(), messages.size());
             if (LOG.isDebugEnabled()) {
                 LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight:
" + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
-                        + pagedInMessages.size());
+                        + pagedInMessages.size() + ", enqueueSize: " + destinationStatistics.getEnqueues().getCount());
             }
 
             if (isLazyDispatch() && !force) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java?rev=900318&r1=900317&r2=900318&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java Mon
Jan 18 09:43:21 2010
@@ -63,7 +63,7 @@
         private boolean running;
         private org.omg.CORBA.IntHolder startup;
         private Thread thread;
-        private int numToProcessPerIteration;
+        private final int numToProcessPerIteration;
 
         Consumer(ActiveMQConnectionFactory connectionFactory, String queueName, org.omg.CORBA.IntHolder
startup, int id, int numToProcess) {
             this.connectionFactory = connectionFactory;
@@ -206,12 +206,15 @@
                     try {
                         processMessage(session, producer, message);
                         session.commit();
+                        numToProcess--;
                     } catch (Throwable t) {
                         error("message=" + message + " failure", t);
                         session.rollback();
                     }
+                } else {
+                    info("got null message on: " + numToProcess);
                 }
-            } while ((numToProcess == CONSUME_ALL || --numToProcess > 0) && isRunning());
+            } while ((numToProcessPerIteration == CONSUME_ALL || numToProcess > 0) &&
isRunning());
         }
 
         public void run() {
@@ -360,7 +363,7 @@
                                     error("Failed to commit with count: " + messageCount.value,
e);
                                 }
                             }
-                            messageCount.notify();
+                            messageCount.notifyAll();
                         }
                     } else {
                         error("Producer cannot process " + reply.getClass().getSimpleName());
@@ -441,7 +444,7 @@
     String masterUrl;
 
     public void setUp() throws Exception {
-        setMaxTestTime(10 * 60 * 1000);
+        setMaxTestTime(6 * 60 * 1000);
         setAutoFail(true);
         super.setUp();
         master.setBrokerName("Master");



Mime
View raw message