camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "William Tam (JIRA)" <j...@apache.org>
Subject [jira] Issue Comment Edited: (CAMEL-1510) BatchProcessor interrupt has side effects
Date Thu, 16 Apr 2009 16:51:31 GMT

    [ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51230#action_51230
] 

William Tam edited comment on CAMEL-1510 at 4/16/09 9:49 AM:
-------------------------------------------------------------

Thanks guys.  Christopher, your patch looks good.  I slightly modified it. 

1. I got it to pass style check.  FYI, you enable style checker by "mvn -Psourcecheck".

2. I made sure queue.size() is called with queueLock held.

(Regarding "while loop around (isInBatchCompleted(queue.size)", your patch and the original
code stand correct.  It is still needed but only for the timeout case.)





      was (Author: wtam):
    Thanks guys.  Christopher, I have a few questions/comments in the patch.

1. Please use spaces instead of tabs for indentation.  You can actually run style checker
by "mvn -Psourcecheck".

Actually, now that we don't use thread interruption as signaling mechanism we do have to put
the drainQueueTo() in a while loop.  (I removed some of my comments).  I will revise the patch
and post it.


  
> BatchProcessor interrupt has side effects
> -----------------------------------------
>
>                 Key: CAMEL-1510
>                 URL: https://issues.apache.org/activemq/browse/CAMEL-1510
>             Project: Apache Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 1.6.0, 2.0-M1
>         Environment: Mac OS X
>            Reporter: Christopher Hunt
>            Assignee: William Tam
>            Priority: Critical
>         Attachments: BatchProcessor.java.20.diff
>
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt method to
wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle of processing
exchanges, and the processing involves something slow like establishing a JMS connection over
SSL or queuing to an asynchronous processor, then the processing can become interrupted. The
consequence of this side effect is that the batch sender thread rarely gets the opportunity
to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in continuously
adding exchanges to the aggregator, the threshold becoming reached, and then trying to enqueue
the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer grained
concurrency controls being used instead of relying upon interrupting a thread. Perhaps something
like the following (untested) re-write of the sender:
> {code}
>     private class BatchSender extends Thread {
>         private Queue<Exchange> queue;
>         private boolean exchangeQueued = false;
>         private Lock queueMutex = new ReentrantLock();
>         private Condition queueCondition = queueMutex.newCondition();
>         public BatchSender() {
>             super("Batch Sender");
>             this.queue = new LinkedList<Exchange>();
>         }
>         public void cancel() {
>             interrupt();
>         }
>         private void drainQueueTo(Collection<Exchange> collection, int batchSize)
{
>             for (int i = 0; i < batchSize; ++i) {
>                 Exchange e = queue.poll();
>                 if (e != null) {
>                     collection.add(e);
>                 } else {
>                     break;
>                 }
>             }
>         }
>         public void enqueueExchange(Exchange exchange) {
>             queueMutex.lock();
>             try {
>                 queue.add(exchange);
>                 exchangeQueued = true;
>                 queueCondition.signal();
>             } finally {
>                 queueMutex.unlock();
>             }
>         }
>         @Override
>         public void run() {
>             queueMutex.lock();
>             try {
>                 do {
>                     try {
>                         if (!exchangeQueued) {
>                             queueCondition.await(batchTimeout,
>                                     TimeUnit.MILLISECONDS);
>                             if (!exchangeQueued) {
>                                 drainQueueTo(collection, batchSize);
>                             }
>                         }
>                         if (exchangeQueued) {
>                             exchangeQueued = false;
>                             queueMutex.unlock();
>                             try {
>                                 while (isInBatchCompleted(queue.size())) {
>                                     queueMutex.lock();
>                                     try {
>                                         drainQueueTo(collection, batchSize);
>                                     } finally {
>                                         queueMutex.unlock();
>                                     }
>                                 }
>                                 if (!isOutBatchCompleted()) {
>                                     continue;
>                                 }
>                             } finally {
>                                 queueMutex.lock();
>                             }
>                         }
>                         queueMutex.unlock();
>                         try {
>                             try {
>                                 sendExchanges();
>                             } catch (Exception e) {
>                                 getExceptionHandler().handleException(e);
>                             }
>                         } finally {
>                             queueMutex.lock();
>                         }
>                     } catch (InterruptedException e) {
>                         break;
>                     }
>                 } while (true);
>             } finally {
>                 queueMutex.unlock();
>             }
>         }
>         private void sendExchanges() throws Exception {
>             Iterator<Exchange> iter = collection.iterator();
>             while (iter.hasNext()) {
>                 Exchange exchange = iter.next();
>                 iter.remove();
>                 processExchange(exchange);
>             }
>         }
>     }
> {code}
> I have replaced the concurrent queue with a regular linked list and mutexed its access.
In addition any queuing of exchanges is noted. This should result in less locking.
> The main change though is that queuing an exchange does not interrupt the batch sender's
current activity.
> I hope that this sample is useful.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message