activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r815788 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java
Date Wed, 16 Sep 2009 14:04:20 GMT
Author: rajdavies
Date: Wed Sep 16 14:04:20 2009
New Revision: 815788

URL: http://svn.apache.org/viewvc?rev=815788&view=rev
Log:
fix some synchonization issues

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java?rev=815788&r1=815787&r2=815788&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java
Wed Sep 16 14:04:20 2009
@@ -28,32 +28,25 @@
 
 public class ConsumerBean extends Assert implements MessageListener {
     private static final Log LOG = LogFactory.getLog(ConsumerBean.class);
-    private List<Message> messages = new ArrayList<Message>();
-    private Object semaphore;
+    private final List<Message> messages = new ArrayList<Message>();
     private boolean verbose;
 
     /**
      * Constructor.
      */
     public ConsumerBean() {
-        this(new Object());
-    }
-
-    /**
-     * Constructor, initialized semaphore object.
-     *
-     * @param semaphore
-     */
-    public ConsumerBean(Object semaphore) {
-        this.semaphore = semaphore;
     }
 
+    
     /**
      * @return all the messages on the list so far, clearing the buffer
      */
-    public synchronized List<Message> flushMessages() {
-        List<Message> answer = new ArrayList<Message>(messages);
+    public List<Message> flushMessages() {
+        List<Message> answer = null;
+        synchronized(messages) {
+        answer = new ArrayList<Message>(messages);
         messages.clear();
+        }
         return answer;
     }
 
@@ -62,13 +55,13 @@
      *
      * @param message
      */
-    public synchronized void onMessage(Message message) {
-        messages.add(message);
-        if (verbose) {
-            LOG.info("Received: " + message);
-        }
-        synchronized (semaphore) {
-            semaphore.notifyAll();
+    public void onMessage(Message message) {
+        synchronized (messages) {
+            messages.add(message);
+            if (verbose) {
+                LOG.info("Received: " + message);
+            }
+            messages.notifyAll();
         }
     }
 
@@ -82,8 +75,8 @@
 
         try {
             if (hasReceivedMessage()) {
-                synchronized (semaphore) {
-                    semaphore.wait(4000);
+                synchronized (messages) {
+                    messages.wait(4000);
                 }
             }
         } catch (InterruptedException e) {
@@ -100,29 +93,30 @@
      * @param messageCount
      */
     public void waitForMessagesToArrive(int messageCount) {
-        final long maxRemainingMessageCount = Math.max(0, messageCount - messages.size());
-        LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to arrive");
    
+        long maxRemainingMessageCount = Math.max(0, messageCount - messages.size());
+        LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to arrive");
         long start = System.currentTimeMillis();
-        for (int i = 0; i < maxRemainingMessageCount; i++) {
+        long maxWaitTime = start + 120 * 1000;
+        while (maxRemainingMessageCount > 0) {
             try {
-                synchronized (semaphore) {
-                    semaphore.wait(1000);
+                synchronized (messages) {
+                    messages.wait(1000);
                 }
-                if (hasReceivedMessages(messageCount)) {
+                if (hasReceivedMessages(messageCount) || System.currentTimeMillis() >
maxWaitTime) {
                     break;
                 }
             } catch (InterruptedException e) {
                 LOG.info("Caught: " + e);
             }
+            maxRemainingMessageCount = Math.max(0, messageCount - messages.size());
         }
         long end = System.currentTimeMillis() - start;
-
         LOG.info("End of wait for " + end + " millis");
     }
 
     public void assertMessagesArrived(int total) {
         waitForMessagesToArrive(total);
-        synchronized (this) {
+        synchronized (messages) {
             int count = messages.size();
 
             assertEquals("Messages received", total, count);
@@ -152,7 +146,9 @@
      * @param messageCount
      * @return
      */
-    protected synchronized boolean hasReceivedMessages(int messageCount) {
-        return messages.size() >= messageCount;
+    protected boolean hasReceivedMessages(int messageCount) {
+        synchronized (messages) {
+            return messages.size() >= messageCount;
+        }
     }
 }



Mime
View raw message