activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r573400 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/command/ test/java/org/apache/activemq/bugs/
Date Thu, 06 Sep 2007 22:19:27 GMT
Author: chirino
Date: Thu Sep  6 15:19:25 2007
New Revision: 573400

URL: http://svn.apache.org/viewvc?rev=573400&view=rev
Log:
Fix for the QueueWorkerPrefetchTest. The VMPendingMessageCursor.isEmpty() was returning true
when it had an a message that had been marked dropped due to it being delivered by another
subscription.
 

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java?rev=573400&r1=573399&r2=573400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
Thu Sep  6 15:19:25 2007
@@ -55,5 +55,10 @@
      * Returns true if this message is expired
      */
     boolean isExpired();
+
+    /**
+     * Returns true if this message is dropped.
+     */
+    boolean isDropped();
     
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=573400&r1=573399&r2=573400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
Thu Sep  6 15:19:25 2007
@@ -35,7 +35,19 @@
      * @return true if there are no pending messages
      */
     public boolean isEmpty() {
-        return list.isEmpty();
+        if (list.isEmpty()) {
+            return true;
+        } else {
+            for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();)
{
+                MessageReference node = iterator.next();
+                if (!node.isDropped()) {
+                    return false;
+                }
+                // We can remove dropped references.
+                iterator.remove();
+            }
+            return true;
+        }
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=573400&r1=573399&r2=573400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Thu
Sep  6 15:19:25 2007
@@ -689,4 +689,8 @@
     public void setBrokerOutTime(long brokerOutTime) {
         this.brokerOutTime = brokerOutTime;
     }
+    
+    public boolean isDropped() {
+        return false;
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java?rev=573400&r1=573399&r2=573400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
Thu Sep  6 15:19:25 2007
@@ -29,6 +29,7 @@
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
 import javax.jms.Queue;
 import javax.jms.Session;
 
@@ -42,6 +43,7 @@
  */
 public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
 {
+    private static final int BATCH_SIZE = 10;
     private static final long WAIT_TIMEOUT = 1000*10;
 
     /** The connection URL. */
@@ -70,6 +72,14 @@
     /** Messages sent to the work-item queue. */
     private static class WorkMessage implements Serializable
     {
+        private final int id;
+        public WorkMessage(int id) {
+            this.id = id;
+        }
+        @Override
+        public String toString() {
+            return "Work: "+id;
+        }
     }
 
     /**
@@ -79,6 +89,7 @@
      */
     private static class Worker implements MessageListener
     {
+
         /** Counter shared between workers to decided when new work-item messages are created.
*/
         private static AtomicInteger counter = new AtomicInteger(0);
 
@@ -106,24 +117,23 @@
         {
             try
             {
-                boolean sendMessage = false;
-
-                // Don't create a new work item for every 1000th message. */
-                if (counter.incrementAndGet() % 1000 != 0)
-                {
-                    sendMessage = true;
+                WorkMessage work = (WorkMessage)((ObjectMessage)message).getObject();
+                
+                long c = counter.incrementAndGet();
+                if (c % 1 == 0) {
+                    System.out.println("Worker now has message count of: " + c);
                 }
 
-                if (sendMessage)
+                // Don't create a new work item for every BATCH_SIZE message. */
+                if (c % BATCH_SIZE != 0)
                 {
                     // Send new work item to work-item queue.
                     workItemProducer.send(session.createObjectMessage(
-                            new WorkMessage()));
+                            new WorkMessage(work.id+1)));
                 }
 
                 // Send ack to master.
-                masterItemProducer.send(session.createObjectMessage(
-                        new WorkMessage()));
+                masterItemProducer.send(session.createObjectMessage(work));
             }
             catch (JMSException e)
             {
@@ -145,7 +155,7 @@
     {
         long acks = acksReceived.incrementAndGet();
         latch.get().countDown();
-        if (acks % 100 == 0) {
+        if (acks % 1 == 0) {
             System.out.println("Master now has ack count of: " + acksReceived);
         }
     }
@@ -193,10 +203,10 @@
             workers[i] = new Worker(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
         }
 
-        // Send a message to the work queue, and wait for the 1000 acks from the workers.
+        // Send a message to the work queue, and wait for the BATCH_SIZE acks from the workers.
         acksReceived.set(0);
-        latch.set(new CountDownLatch(1000));
-        workItemProducer.send(masterSession.createObjectMessage(new WorkMessage()));
+        latch.set(new CountDownLatch(BATCH_SIZE));
+        workItemProducer.send(masterSession.createObjectMessage(new WorkMessage(1)));
         
         if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
             fail("First batch only received " + acksReceived + " messages");
@@ -209,8 +219,8 @@
         // have a large pending queue.  Creating a new worker at this point however will
         // receive this new message.
         acksReceived.set(0);
-        latch.set(new CountDownLatch(1000));
-        workItemProducer.send(masterSession.createObjectMessage(new WorkMessage()));
+        latch.set(new CountDownLatch(BATCH_SIZE));
+        workItemProducer.send(masterSession.createObjectMessage(new WorkMessage(1)));
         
         if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
             fail("Second batch only received " + acksReceived + " messages");



Mime
View raw message