Return-Path: Delivered-To: apmail-camel-dev-archive@www.apache.org Received: (qmail 59299 invoked from network); 16 Apr 2009 19:00:58 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 16 Apr 2009 19:00:58 -0000 Received: (qmail 69013 invoked by uid 500); 16 Apr 2009 19:00:58 -0000 Delivered-To: apmail-camel-dev-archive@camel.apache.org Received: (qmail 68974 invoked by uid 500); 16 Apr 2009 19:00:58 -0000 Mailing-List: contact dev-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list dev@camel.apache.org Received: (qmail 68964 invoked by uid 500); 16 Apr 2009 19:00:58 -0000 Delivered-To: apmail-activemq-camel-dev@activemq.apache.org Received: (qmail 68961 invoked by uid 99); 16 Apr 2009 19:00:58 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Apr 2009 19:00:58 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.140] (HELO brutus.apache.org) (140.211.11.140) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Apr 2009 19:00:56 +0000 Received: from brutus (localhost [127.0.0.1]) by brutus.apache.org (Postfix) with ESMTP id 71653234C045 for ; Thu, 16 Apr 2009 12:00:33 -0700 (PDT) Message-ID: <1335262548.1239908433462.JavaMail.jira@brutus> Date: Thu, 16 Apr 2009 12:00:33 -0700 (PDT) From: "William Tam (JIRA)" To: camel-dev@activemq.apache.org Subject: [jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects In-Reply-To: <493292768.1238634334695.JavaMail.jira@brutus> MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: ae95407df07c98740808b2ef9da0087c X-Virus-Checked: Checked by ClamAV on apache.org [ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51236#action_51236 ] William Tam commented on CAMEL-1510: ------------------------------------ To answer Christopher's question: "Just wondering... the batch processor's collection should always be a thread-safe type of collection. Is this the case in practice? If the collection is not thread safe then the batch sender run method will contend with the processor's isOutBatchCompleted(), doStop() and getCollection() methods." The batch processor's collection is exclusively accessed by the BatchSender thread (with the exception accessed by the doStop() method), so the collection does not need to be a thread-safe type. The doStop() method is called during shutdown and it interrupts the BatchSender thread before clear() on the collection, so it should be fine. The getCollection() is a protected method and it never gets called. We probably should get rid of getCollection() and make In/OutBatchCompleted() method private. Thoughts? > BatchProcessor interrupt has side effects > ----------------------------------------- > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core > Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X > Reporter: Christopher Hunt > Assignee: William Tam > Priority: Critical > Attachments: BatchProcessor.java.20.diff > > > I have noticed that the BatchProcessor class uses the Thread class interrupt method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle of processing exchanges, and the processing involves something slow like establishing a JMS connection over SSL or queuing to an asynchronous processor, then the processing can become interrupted. The consequence of this side effect is that the batch sender thread rarely gets the opportunity to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in continuously adding exchanges to the aggregator, the threshold becoming reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer grained concurrency controls being used instead of relying upon interrupting a thread. Perhaps something like the following (untested) re-write of the sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } > if (!isOutBatchCompleted()) { > continue; > } > } finally { > queueMutex.lock(); > } > } > queueMutex.unlock(); > try { > try { > sendExchanges(); > } catch (Exception e) { > getExceptionHandler().handleException(e); > } > } finally { > queueMutex.lock(); > } > } catch (InterruptedException e) { > break; > } > } while (true); > } finally { > queueMutex.unlock(); > } > } > private void sendExchanges() throws Exception { > Iterator iter = collection.iterator(); > while (iter.hasNext()) { > Exchange exchange = iter.next(); > iter.remove(); > processExchange(exchange); > } > } > } > {code} > I have replaced the concurrent queue with a regular linked list and mutexed its access. In addition any queuing of exchanges is noted. This should result in less locking. > The main change though is that queuing an exchange does not interrupt the batch sender's current activity. > I hope that this sample is useful. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.