Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 18242 invoked from network); 6 Sep 2007 19:26:11 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 6 Sep 2007 19:26:11 -0000 Received: (qmail 35659 invoked by uid 500); 6 Sep 2007 19:26:05 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 35633 invoked by uid 500); 6 Sep 2007 19:26:05 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 35622 invoked by uid 99); 6 Sep 2007 19:26:05 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Sep 2007 12:26:05 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Sep 2007 19:27:29 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 3A4831A9832; Thu, 6 Sep 2007 12:25:47 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r573342 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java Date: Thu, 06 Sep 2007 19:25:47 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20070906192547.3A4831A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Thu Sep 6 12:25:46 2007 New Revision: 573342 URL: http://svn.apache.org/viewvc?rev=573342&view=rev Log: Fixed synchronizations so that threads don't block each others processing as much and now the test works fine without hanging. see https://issues.apache.org/activemq/browse/AMQ-1251 Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java?rev=573342&r1=573341&r2=573342&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java Thu Sep 6 12:25:46 2007 @@ -18,26 +18,32 @@ package org.apache.activemq.bugs; import java.io.Serializable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; -import javax.jms.MessageListener; -import javax.jms.Session; import javax.jms.Connection; import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.MessageProducer; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.broker.BrokerService; -import junit.framework.TestCase; - /** * Test case demonstrating situation where messages are not delivered to consumers. */ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener { + private static final long WAIT_TIMEOUT = 1000*10; + /** The connection URL. */ private static final String CONNECTION_URL = "tcp://localhost:61616"; @@ -57,10 +63,9 @@ private MessageConsumer masterItemConsumer; /** The number of acks received by the master. */ - private long acksReceived; + private AtomicLong acksReceived = new AtomicLong(0); - /** The expected number of acks the master should receive. */ - private long expectedCount; + private AtomicReference latch = new AtomicReference(); /** Messages sent to the work-item queue. */ private static class WorkMessage implements Serializable @@ -75,7 +80,7 @@ private static class Worker implements MessageListener { /** Counter shared between workers to decided when new work-item messages are created. */ - private static Integer counter = new Integer(0); + private static AtomicInteger counter = new AtomicInteger(0); /** Session to use. */ private Session session; @@ -104,13 +109,9 @@ boolean sendMessage = false; // Don't create a new work item for every 1000th message. */ - synchronized (counter) + if (counter.incrementAndGet() % 1000 != 0) { - counter++; - if (counter % 1000 != 0) - { - sendMessage = true; - } + sendMessage = true; } if (sendMessage) @@ -140,16 +141,11 @@ } /** Master message handler. Process ack messages. */ - public synchronized void onMessage(javax.jms.Message message) + public void onMessage(javax.jms.Message message) { - acksReceived++; - if (acksReceived == expectedCount) - { - // If expected number of acks are received, wake up the main process. - notify(); - } - if (acksReceived % 100 == 0) - { + long acks = acksReceived.incrementAndGet(); + latch.get().countDown(); + if (acks % 100 == 0) { System.out.println("Master now has ack count of: " + acksReceived); } } @@ -173,7 +169,7 @@ super.tearDown(); } - public synchronized void testActiveMQ() + public void testActiveMQ() throws Exception { // Create the connection to the broker. @@ -198,30 +194,32 @@ } // Send a message to the work queue, and wait for the 1000 acks from the workers. - expectedCount = 1000; - acksReceived = 0; + acksReceived.set(0); + latch.set(new CountDownLatch(1000)); workItemProducer.send(masterSession.createObjectMessage(new WorkMessage())); - while (acksReceived != expectedCount) - { - wait(); + + if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) { + fail("First batch only received " + acksReceived + " messages"); } + System.out.println("First batch received"); // Send another message to the work queue, and wait for the next 1000 acks. It is // at this point where the workers never get notified of this message, as they // have a large pending queue. Creating a new worker at this point however will // receive this new message. - expectedCount = 2000; + acksReceived.set(0); + latch.set(new CountDownLatch(1000)); workItemProducer.send(masterSession.createObjectMessage(new WorkMessage())); - while (acksReceived != expectedCount) - { - wait(); + + if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) { + fail("Second batch only received " + acksReceived + " messages"); } + System.out.println("Second batch received"); // Cleanup all JMS resources. - for (int i = 0; i < NUM_WORKERS; i++) - { + for (int i = 0; i < NUM_WORKERS; i++) { workers[i].close(); } masterSession.close();