activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r651316 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/thread/PooledTaskRunner.java test/java/org/apache/activemq/thread/PooledTaskRunnerTest.java
Date Thu, 24 Apr 2008 17:04:06 GMT
Author: chirino
Date: Thu Apr 24 10:03:47 2008
New Revision: 651316

URL: http://svn.apache.org/viewvc?rev=651316&view=rev
Log:
Applied patch from AMQ-1686 thanks Gary

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/thread/PooledTaskRunnerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java?rev=651316&r1=651315&r2=651316&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java
Thu Apr 24 10:03:47 2008
@@ -127,24 +127,23 @@
         } finally {
             synchronized( runable ) {
                 iterating = false;
-            }
-        }
-
-        synchronized (runable) {
-            if (shutdown) {
-                queued = false;
                 runable.notifyAll();
-                return;
-            }
+                if (shutdown) {
+                    queued = false;
+                    runable.notifyAll();
+                    return;
+                }
+
+                // If we could not iterate all the items
+                // then we need to re-queue.
+                if (!done) {
+                    queued = true;
+                }
 
-            // If we could not iterate all the items
-            // then we need to re-queue.
-            if (!done) {
-                queued = true;
-            }
+                if (queued) {
+                    executor.execute(runable);
+                }
 
-            if (queued) {
-                executor.execute(runable);
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/thread/PooledTaskRunnerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/thread/PooledTaskRunnerTest.java?rev=651316&r1=651315&r2=651316&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/thread/PooledTaskRunnerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/thread/PooledTaskRunnerTest.java
Thu Apr 24 10:03:47 2008
@@ -21,8 +21,12 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.TestCase;
 
@@ -57,6 +61,62 @@
 
         assertTrue( latch.await( 1, TimeUnit.SECONDS ) );
 
+        runner.shutdown();
+    }
+
+    
+    public void testWakeupResultsInThreadSafeCalls() throws Exception {
+        
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable, getName());
+                thread.setDaemon(true);
+                thread.setPriority(Thread.NORM_PRIORITY);
+                return thread;
+            }
+        });
+        final CountDownLatch doneLatch = new CountDownLatch( 100 );
+        final AtomicInteger clashCount = new AtomicInteger();
+        final AtomicInteger count = new AtomicInteger();
+
+
+        final PooledTaskRunner runner = new PooledTaskRunner(executor, new Task() {
+            String threadUnSafeVal = null;
+            public boolean iterate() {
+                if (threadUnSafeVal != null) {
+                    clashCount.incrementAndGet();
+                }
+                threadUnSafeVal = Thread.currentThread().getName();
+                count.incrementAndGet();
+                doneLatch.countDown();
+                if (!threadUnSafeVal.equals(Thread.currentThread().getName())) {
+                    clashCount.incrementAndGet();
+                }
+                threadUnSafeVal = null;
+                return false;
+            }
+        }, 1 );
+
+        Runnable doWakeup = new Runnable() {
+            public void run() {
+                try {
+                    runner.wakeup();
+                } catch (InterruptedException ignored) {
+                }
+            }
+        };
+        
+        final int iterations = 1000;
+        for (int i=0; i< iterations; i++) {
+            if (i%100 == 0) {
+                Thread.sleep(10);
+            }
+            executor.execute(doWakeup);
+        }    
+        
+        doneLatch.await(20, TimeUnit.SECONDS);
+        assertEquals("thread safety clash", 0, clashCount.get());
+        assertTrue("called more than once", count.get() > 1);
         runner.shutdown();
     }
 



Mime
View raw message