Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3F9AE776D for ; Fri, 19 Aug 2011 17:19:20 +0000 (UTC) Received: (qmail 56591 invoked by uid 500); 19 Aug 2011 17:19:20 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 56473 invoked by uid 500); 19 Aug 2011 17:19:19 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 56466 invoked by uid 99); 19 Aug 2011 17:19:19 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Aug 2011 17:19:18 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Aug 2011 17:19:17 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 007C02388A5E for ; Fri, 19 Aug 2011 17:18:57 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1159695 - in /cxf/trunk/rt: core/src/main/java/org/apache/cxf/interceptor/ core/src/main/java/org/apache/cxf/workqueue/ core/src/test/java/org/apache/cxf/workqueue/ ws/addr/src/main/java/org/apache/cxf/ws/addressing/ Date: Fri, 19 Aug 2011 17:18:56 -0000 To: commits@cxf.apache.org From: dkulp@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110819171857.007C02388A5E@eris.apache.org> Author: dkulp Date: Fri Aug 19 17:18:56 2011 New Revision: 1159695 URL: http://svn.apache.org/viewvc?rev=1159695&view=rev Log: [CXF-3750] Fix problem with CXF could lockup with a lot of long running one-way operations. Fix problems with OneWay and decoupled ws-addr not buffering the incoming stream properly Fix AutomaticWorkqueue to actually create threads as needed PRIOR to the queue filling completely up. Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java cxf/trunk/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java?rev=1159695&r1=1159694&r2=1159695&view=diff ============================================================================== --- cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java (original) +++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java Fri Aug 19 17:18:56 2011 @@ -76,7 +76,7 @@ public class OneWayProcessorInterceptor //need to suck in all the data from the input stream as //the transport might discard any data on the stream when this //thread unwinds or when the empty response is sent back - DelegatingInputStream in = message.get(DelegatingInputStream.class); + DelegatingInputStream in = message.getContent(DelegatingInputStream.class); if (in != null) { in.cacheInput(); } @@ -102,12 +102,13 @@ public class OneWayProcessorInterceptor if (Boolean.FALSE.equals(o)) { chain.pause(); try { - synchronized (chain) { + final Object lock = new Object(); + synchronized (lock) { message.getExchange().get(Bus.class).getExtension(WorkQueueManager.class) .getAutomaticWorkQueue().execute(new Runnable() { public void run() { - synchronized (chain) { - chain.notifyAll(); + synchronized (lock) { + lock.notifyAll(); } chain.resume(); } @@ -115,7 +116,7 @@ public class OneWayProcessorInterceptor //wait a few milliseconds for the background thread to start processing //Mostly just to make an attempt at keeping the ordering of the //messages coming in from a client. Not guaranteed though. - chain.wait(20); + lock.wait(20); } } catch (RejectedExecutionException e) { //the executor queue is full, so run the task in the caller thread Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java?rev=1159695&r1=1159694&r2=1159695&view=diff ============================================================================== --- cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java (original) +++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java Fri Aug 19 17:18:56 2011 @@ -19,6 +19,8 @@ package org.apache.cxf.workqueue; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.concurrent.DelayQueue; @@ -30,6 +32,7 @@ import java.util.concurrent.ThreadPoolEx import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -52,6 +55,9 @@ public class AutomaticWorkQueueImpl exte WorkQueueManagerImpl manager; String name = "default"; + final int corePoolSize; + final int maxPoolSize; + final ReentrantLock mainLock; public AutomaticWorkQueueImpl() { this(DEFAULT_MAX_QUEUE_SIZE); @@ -125,6 +131,19 @@ public class AutomaticWorkQueueImpl exte // start the watch dog thread watchDog.setDaemon(true); watchDog.start(); + + corePoolSize = this.getCorePoolSize(); + maxPoolSize = this.getMaximumPoolSize(); + + ReentrantLock l = null; + try { + Field f = ThreadPoolExecutor.class.getDeclaredField("mainLock"); + f.setAccessible(true); + l = (ReentrantLock)f.get(this); + } catch (Throwable t) { + l = new ReentrantLock(); + } + mainLock = l; } private static ThreadFactory createThreadFactory(final String name) { ThreadGroup group; @@ -337,7 +356,33 @@ public class AutomaticWorkQueueImpl exte } } }; + //The ThreadPoolExecutor in the JDK doesn't expand the number + //of threads until the queue is full. However, we would + //prefer the number of threads to expand immediately and + //only uses the queue if we've reached the maximum number + //of threads. Thus, we'll set the core size to the max, + //add the runnable, and set back. That will cause the + //threads to be created as needed. super.execute(r); + if (!getQueue().isEmpty() && this.getPoolSize() < maxPoolSize) { + mainLock.lock(); + try { + int ps = this.getPoolSize(); + int sz = getQueue().size(); + int sz2 = this.getActiveCount(); + + if ((sz + sz2) > ps) { + Method m = ThreadPoolExecutor.class.getDeclaredMethod("addIfUnderMaximumPoolSize", + Runnable.class); + m.setAccessible(true); + m.invoke(this, new Object[1]); + } + } catch (Exception ex) { + //ignore + } finally { + mainLock.unlock(); + } + } } // WorkQueue interface Modified: cxf/trunk/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java?rev=1159695&r1=1159694&r2=1159695&view=diff ============================================================================== --- cxf/trunk/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java (original) +++ cxf/trunk/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java Fri Aug 19 17:18:56 2011 @@ -281,7 +281,7 @@ public class AutomaticWorkQueueTest exte // Give threads a chance to dequeue (5sec max) int i = 0; - while (workqueue.getPoolSize() != 10 && i++ < 50) { + while (workqueue.getPoolSize() > 10 && i++ < 50) { try { Thread.sleep(100); } catch (InterruptedException ie) { @@ -300,7 +300,7 @@ public class AutomaticWorkQueueTest exte } @Test - public void testThreadPoolShrinkUnbounded() { + public void testThreadPoolShrinkUnbounded() throws Exception { workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE, INITIAL_SIZE, UNBOUNDED_HIGH_WATER_MARK, DEFAULT_LOW_WATER_MARK, 100L); @@ -311,18 +311,15 @@ public class AutomaticWorkQueueTest exte // Give threads a chance to dequeue (5sec max) int i = 0; int last = workqueue.getPoolSize(); - while (workqueue.getPoolSize() != DEFAULT_LOW_WATER_MARK && i++ < 50) { + while (workqueue.getPoolSize() > DEFAULT_LOW_WATER_MARK && i++ < 50) { if (last != workqueue.getPoolSize()) { last = workqueue.getPoolSize(); i = 0; } - try { - Thread.sleep(100); - } catch (InterruptedException ie) { - // ignore - } + Thread.sleep(100); } - assertTrue("threads_total()", workqueue.getPoolSize() <= DEFAULT_LOW_WATER_MARK); + int sz = workqueue.getPoolSize(); + assertTrue("threads_total(): " + sz, workqueue.getPoolSize() <= DEFAULT_LOW_WATER_MARK); } @Test Modified: cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java?rev=1159695&r1=1159694&r2=1159695&view=diff ============================================================================== --- cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java (original) +++ cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java Fri Aug 19 17:18:56 2011 @@ -434,7 +434,7 @@ public final class ContextUtils { //need to suck in all the data from the input stream as //the transport might discard any data on the stream when this //thread unwinds or when the empty response is sent back - DelegatingInputStream in = inMessage.get(DelegatingInputStream.class); + DelegatingInputStream in = inMessage.getContent(DelegatingInputStream.class); if (in != null) { in.cacheInput(); }