camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gert Vanthienen <gert.vanthie...@gmail.com>
Subject Re: BatchProcessor interrupt side effects
Date Wed, 01 Apr 2009 13:02:48 GMT
L.S.,

First of all, thanks for taking the time to review the code and come
up with these suggestions.  Yes, I think it would probably make sense
to leverage the helper classes available in java.util.concurrent
instead of doing Thread.sleep() ourselves and then interrupting the
thread again when an Exchange arrives.  Just wonder if we can't just
rely on some thread-safe queue or latch to provide the same semantics,
but anyway you're welcome to raise a JIRA issue and attach your
proposed patch there.

Regards,

Gert Vanthienen
------------------------
Open Source SOA: http://fusesource.com
Blog: http://gertvanthienen.blogspot.com/



2009/4/1 huntc <huntc@mac.com>:
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt
> method to wake the run loop from sleeping within the enqueueExchange method.
>
> The unfortunate side effect of this is that if the run loop is in the middle
> of processing exchanges, and the processing involves something slow like
> establishing a JMS connection over SSL, then the processing can become
> interrupted.
>
> This all became apparent during some performance testing that resulted in
> continuously adding exchanges to the aggregator, the threshold becoming
> reached, and then trying to enqueue the aggregated result to a JMS queue.
>
> If my analysis of the BatchProcessor is correct then I would recommend a
> Condition object being used instead of relying upon interrupting a thread.
> Perhaps something like the following (untested - could be mistakes of course
> - assumes present Camel 1.6 trunk):
>
>
>    private class BatchSender extends Thread {
>
>        private boolean enqueuedExchange = false;
>        private Lock runLock = new ReentrantLock();
>        private Condition runCondition = runLock.newCondition();
>
>        private LinkedBlockingQueue queue;
>
>        public BatchSender() {
>            super("Batch Sender");
>            this.queue = new LinkedBlockingQueue();
>        }
>
>        @Override
>        public void run() {
>            runLock.lock();
>            try {
>                do {
>                    try {
>                        if (!enqueuedExchanged) {
>                            runCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
>                            if (!enqueuedExchanged) {
>                                queue.drainTo(collection, batchSize);
>                            }
>                        }
>
>                        if (enqueuedExchanged) {
>                            enqueuedExchange = false;
>
>                            runLock.unlock();
>                            try {
>                                while (isInBatchCompleted(queue.size()))
{
>                                    queue.drainTo(collection, batchSize);
>                                }
>
>                                if (!isOutBatchCompleted()) {
>                                    continue;
>                                }
>
>                            } finally {
>                                runLock.lock();
>                            }
>                        }
>
>                        runLock.unlock();
>                        try {
>                            try {
>                                sendExchanges();
>                            } catch (Exception e) {
>                                getExceptionHandler().handleException(e);
>                            }
>
>                        } finally {
>                            runLock.lock();
>                        }
>
>                    } catch (InterruptedException e) {
>                        break;
>                    }
>
>                } while (true);
>
>            } finally {
>                runLock.unlock();
>            }
>
>        }
>
>        public void cancel() {
>            interrupt();
>        }
>
>        public void enqueueExchange(Exchange exchange) {
>            queue.add(exchange);
>            runLock.lock();
>            try {
>                enqueuedExchange = true;
>                runCondition.signal();
>            } finally {
>                runLock.unlock();
>            }
>        }
>
>        private void sendExchanges() throws Exception {
>            Iterator iter = collection.iterator();
>            while (iter.hasNext()) {
>                Exchange exchange = iter.next();
>                iter.remove();
>                processExchange(exchange);
>            }
>        }
>    }
>
>
> I acknowledge that the above is more complex than what is there presently
> but I think the complexity is necessary. Cancellations will still result in
> an interrupt and behave as they presently do. The major difference is
> considering what the condition is going into a sleep state (nothing enqueued
> that hasn't had its predicate tested), and when it comes out of sleep (if it
> has been), what must then be done. Either exchanges have been enqueued or it
> is time to process exchanges. Even if exchanges have been enqueued then flow
> can still drop down into sending exchanges if the batch thresholds have been
> reached i.e. as before.
>
> I also make a point of unlocking the runLock prior to calling out to
> dependent code.
>
> Thoughts?
> --
> View this message in context: http://www.nabble.com/BatchProcessor-interrupt-side-effects-tp22819960p22819960.html
> Sent from the Camel - Development (activemq) mailing list archive at Nabble.com.
>

Mime
View raw message