Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2402F197B for ; Wed, 20 Apr 2011 10:36:03 +0000 (UTC) Received: (qmail 10792 invoked by uid 500); 20 Apr 2011 10:36:03 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 10739 invoked by uid 500); 20 Apr 2011 10:36:03 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 10732 invoked by uid 99); 20 Apr 2011 10:36:03 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Apr 2011 10:36:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Apr 2011 10:36:00 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A659E2388A33; Wed, 20 Apr 2011 10:35:38 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1095350 - in /cxf/branches/2.3.x-fixes: ./ rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java Date: Wed, 20 Apr 2011 10:35:38 -0000 To: commits@cxf.apache.org From: ningjiang@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110420103538.A659E2388A33@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ningjiang Date: Wed Apr 20 10:35:38 2011 New Revision: 1095350 URL: http://svn.apache.org/viewvc?rev=1095350&view=rev Log: Merged revisions 1095349 via svnmerge from https://svn.apache.org/repos/asf/cxf/trunk ........ r1095349 | ningjiang | 2011-04-20 18:30:44 +0800 (Wed, 20 Apr 2011) | 1 line CXF-3464 AutomaticWorkQueueImpl uses a DelayQueue to accept the tasks which is delayed ........ Modified: cxf/branches/2.3.x-fixes/ (props changed) cxf/branches/2.3.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java Propchange: cxf/branches/2.3.x-fixes/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Apr 20 10:35:38 2011 @@ -1 +1 @@ -/cxf/trunk:1094926,1094992 +/cxf/trunk:1094926,1094992,1095349 Propchange: cxf/branches/2.3.x-fixes/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: cxf/branches/2.3.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java?rev=1095350&r1=1095349&r2=1095350&view=diff ============================================================================== --- cxf/branches/2.3.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java (original) +++ cxf/branches/2.3.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java Wed Apr 20 10:35:38 2011 @@ -21,11 +21,14 @@ package org.apache.cxf.workqueue; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -44,6 +47,8 @@ public class AutomaticWorkQueueImpl exte LogUtils.getL7dLogger(AutomaticWorkQueueImpl.class); int maxQueueSize; + DelayQueue delayQueue = new DelayQueue(); + WatchDog watchDog = new WatchDog(delayQueue); WorkQueueManagerImpl manager; String name = "default"; @@ -116,6 +121,10 @@ public class AutomaticWorkQueueImpl exte } setCorePoolSize(lowWaterMark); } + + // start the watch dog thread + watchDog.setDaemon(true); + watchDog.start(); } private static ThreadFactory createThreadFactory(final String name) { ThreadGroup group; @@ -146,17 +155,87 @@ public class AutomaticWorkQueueImpl exte } return new AWQThreadFactory(group, name); } + + static class DelayedTaskWrapper implements Delayed, Runnable { + long trigger; + Runnable work; + + DelayedTaskWrapper(Runnable work, long delay) { + this.work = work; + trigger = System.currentTimeMillis() + delay; + } + + public long getDelay(TimeUnit unit) { + long n = trigger - System.currentTimeMillis(); + return unit.convert(n, TimeUnit.MILLISECONDS); + } + + public int compareTo(Delayed delayed) { + long other = ((DelayedTaskWrapper)delayed).trigger; + int returnValue; + if (this.trigger < other) { + returnValue = -1; + } else if (this.trigger > other) { + returnValue = 1; + } else { + returnValue = 0; + } + return returnValue; + } + + public void run() { + work.run(); + } + + } + + class WatchDog extends Thread { + DelayQueue delayQueue; + AtomicBoolean shutdown = new AtomicBoolean(false); + + WatchDog(DelayQueue queue) { + delayQueue = queue; + } + + public void shutdown() { + shutdown.set(true); + } + + public void run() { + DelayedTaskWrapper task; + try { + while (!shutdown.get()) { + task = delayQueue.take(); + if (task != null) { + try { + execute(task); + } catch (Exception ex) { + LOG.warning("Executing the task from DelayQueue with exception: " + ex); + } + } + } + } catch (InterruptedException e) { + if (LOG.isLoggable(Level.FINE)) { + LOG.finer("The DelayQueue watchdog Task is stopping"); + } + } + + } + + } static class AWQThreadFactory implements ThreadFactory { final AtomicInteger threadNumber = new AtomicInteger(1); ThreadGroup group; String name; ClassLoader loader; + AWQThreadFactory(ThreadGroup gp, String nm) { group = gp; name = nm; //force the loader to be the loader of CXF, not the application loader loader = AutomaticWorkQueueImpl.class.getClassLoader(); } + public Thread newThread(Runnable r) { if (group.isDestroyed()) { group = new ThreadGroup(group.getParent(), name + "-workqueue"); @@ -273,18 +352,7 @@ public class AutomaticWorkQueueImpl exte } public void schedule(final Runnable work, final long delay) { - // temporary implementation, replace with shared long-lived scheduler - // task - execute(new Runnable() { - public void run() { - try { - Thread.sleep(delay); - } catch (InterruptedException ie) { - // ignore - } - work.run(); - } - }); + delayQueue.put(new DelayedTaskWrapper(work, delay)); } // AutomaticWorkQueue interface @@ -301,6 +369,7 @@ public class AutomaticWorkQueueImpl exte if (f instanceof AWQThreadFactory) { ((AWQThreadFactory)f).shutdown(); } + watchDog.shutdown(); } /**