cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject svn commit: r1095349 - /cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
Date Wed, 20 Apr 2011 10:30:44 GMT
Author: ningjiang
Date: Wed Apr 20 10:30:44 2011
New Revision: 1095349

URL: http://svn.apache.org/viewvc?rev=1095349&view=rev
Log:
CXF-3464 AutomaticWorkQueueImpl uses a DelayQueue to accept the tasks which is delayed

Modified:
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java?rev=1095349&r1=1095348&r2=1095349&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java Wed
Apr 20 10:30:44 2011
@@ -21,11 +21,14 @@ package org.apache.cxf.workqueue;
 
 import java.security.AccessController;
 import java.security.PrivilegedAction;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -44,6 +47,8 @@ public class AutomaticWorkQueueImpl exte
         LogUtils.getL7dLogger(AutomaticWorkQueueImpl.class);
     
     int maxQueueSize;
+    DelayQueue<DelayedTaskWrapper> delayQueue = new DelayQueue<DelayedTaskWrapper>();
+    WatchDog watchDog = new WatchDog(delayQueue);
     
     WorkQueueManagerImpl manager;
     String name = "default";
@@ -116,6 +121,10 @@ public class AutomaticWorkQueueImpl exte
             }
             setCorePoolSize(lowWaterMark);
         }
+        
+        // start the watch dog thread
+        watchDog.setDaemon(true);
+        watchDog.start();
     }
     private static ThreadFactory createThreadFactory(final String name) {
         ThreadGroup group;
@@ -146,17 +155,87 @@ public class AutomaticWorkQueueImpl exte
         }
         return new AWQThreadFactory(group, name);
     }
+    
+    static class DelayedTaskWrapper implements Delayed, Runnable {
+        long trigger;
+        Runnable work;
+        
+        DelayedTaskWrapper(Runnable work, long delay) {
+            this.work = work;
+            trigger = System.currentTimeMillis() + delay;
+        }
+        
+        public long getDelay(TimeUnit unit) {
+            long n = trigger - System.currentTimeMillis();
+            return unit.convert(n, TimeUnit.MILLISECONDS);
+        }
+
+        public int compareTo(Delayed delayed) {
+            long other = ((DelayedTaskWrapper)delayed).trigger;
+            int returnValue;
+            if (this.trigger < other) {
+                returnValue = -1;
+            } else if (this.trigger > other) {
+                returnValue = 1;
+            } else {
+                returnValue = 0;
+            }
+            return returnValue;
+        }
+
+        public void run() {
+            work.run();
+        }
+        
+    }
+    
+    class WatchDog extends Thread {
+        DelayQueue<DelayedTaskWrapper> delayQueue;
+        AtomicBoolean shutdown = new AtomicBoolean(false);
+        
+        WatchDog(DelayQueue<DelayedTaskWrapper> queue) {
+            delayQueue = queue;
+        }
+        
+        public void shutdown() {
+            shutdown.set(true);
+        }
+        
+        public void run() {
+            DelayedTaskWrapper task;
+            try {
+                while (!shutdown.get()) {
+                    task = delayQueue.take();
+                    if (task != null) {
+                        try {
+                            execute(task);
+                        } catch (Exception ex) {
+                            LOG.warning("Executing the task from DelayQueue with exception:
" + ex);
+                        }
+                    }
+                }
+            } catch (InterruptedException e) {
+                if (LOG.isLoggable(Level.FINE)) {
+                    LOG.finer("The DelayQueue watchdog Task is stopping");
+                }
+            }
+
+        }
+        
+    }
     static class AWQThreadFactory implements ThreadFactory {
         final AtomicInteger threadNumber = new AtomicInteger(1);
         ThreadGroup group;
         String name;
         ClassLoader loader;
+        
         AWQThreadFactory(ThreadGroup gp, String nm) {
             group = gp;
             name = nm;
             //force the loader to be the loader of CXF, not the application loader
             loader = AutomaticWorkQueueImpl.class.getClassLoader();
         }
+        
         public Thread newThread(Runnable r) {
             if (group.isDestroyed()) {
                 group = new ThreadGroup(group.getParent(), name + "-workqueue");
@@ -273,18 +352,7 @@ public class AutomaticWorkQueueImpl exte
     }
 
     public void schedule(final Runnable work, final long delay) {
-        // temporary implementation, replace with shared long-lived scheduler
-        // task
-        execute(new Runnable() {
-            public void run() {
-                try {
-                    Thread.sleep(delay);
-                } catch (InterruptedException ie) {
-                    // ignore
-                }
-                work.run();
-            }
-        });
+        delayQueue.put(new DelayedTaskWrapper(work, delay));
     }
     
     // AutomaticWorkQueue interface
@@ -301,6 +369,7 @@ public class AutomaticWorkQueueImpl exte
         if (f instanceof AWQThreadFactory) {
             ((AWQThreadFactory)f).shutdown();
         }
+        watchDog.shutdown();
     }
 
     /**



Mime
View raw message