camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From huntc <hu...@mac.com>
Subject BatchProcessor interrupt side effects
Date Wed, 01 Apr 2009 05:15:26 GMT

I have noticed that the BatchProcessor class uses the Thread class interrupt
method to wake the run loop from sleeping within the enqueueExchange method.

The unfortunate side effect of this is that if the run loop is in the middle
of processing exchanges, and the processing involves something slow like
establishing a JMS connection over SSL, then the processing can become
interrupted.

This all became apparent during some performance testing that resulted in
continuously adding exchanges to the aggregator, the threshold becoming
reached, and then trying to enqueue the aggregated result to a JMS queue.

If my analysis of the BatchProcessor is correct then I would recommend a
Condition object being used instead of relying upon interrupting a thread.
Perhaps something like the following (untested - could be mistakes of course
- assumes present Camel 1.6 trunk):


    private class BatchSender extends Thread {
        
        private boolean enqueuedExchange = false;
        private Lock runLock = new ReentrantLock();
        private Condition runCondition = runLock.newCondition();
        
        private LinkedBlockingQueue queue;
        
        public BatchSender() {
            super("Batch Sender");
            this.queue = new LinkedBlockingQueue();
        }

        @Override
        public void run() {
            runLock.lock();
            try {
                do {
                    try {
                        if (!enqueuedExchanged) {
                            runCondition.await(batchTimeout,
TimeUnit.MILLISECONDS);
                            if (!enqueuedExchanged) {
                                queue.drainTo(collection, batchSize);
                            }
                        }
                        
                        if (enqueuedExchanged) {
                            enqueuedExchange = false;
                            
                            runLock.unlock();
                            try {
                                while (isInBatchCompleted(queue.size())) {
                                    queue.drainTo(collection, batchSize);  
                                }
                                
                                if (!isOutBatchCompleted()) {
                                    continue;
                                }
                                
                            } finally {
                                runLock.lock();
                            }
                        }
                        
                        runLock.unlock();
                        try {
                            try {
                                sendExchanges();
                            } catch (Exception e) {
                                getExceptionHandler().handleException(e);
                            }
                            
                        } finally {
                            runLock.lock();
                        }

                    } catch (InterruptedException e) {
                        break;
                    }
                    
                } while (true);
                
            } finally {
                runLock.unlock();
            }
            
        }
        
        public void cancel() {
            interrupt();
        }
     
        public void enqueueExchange(Exchange exchange) {
            queue.add(exchange);
            runLock.lock();
            try {
                enqueuedExchange = true;
                runCondition.signal();
            } finally {
                runLock.unlock();
            }
        }
        
        private void sendExchanges() throws Exception {
            Iterator iter = collection.iterator();
            while (iter.hasNext()) {
                Exchange exchange = iter.next();
                iter.remove();
                processExchange(exchange);
            }
        }
    }


I acknowledge that the above is more complex than what is there presently
but I think the complexity is necessary. Cancellations will still result in
an interrupt and behave as they presently do. The major difference is
considering what the condition is going into a sleep state (nothing enqueued
that hasn't had its predicate tested), and when it comes out of sleep (if it
has been), what must then be done. Either exchanges have been enqueued or it
is time to process exchanges. Even if exchanges have been enqueued then flow
can still drop down into sending exchanges if the batch thresholds have been
reached i.e. as before.

I also make a point of unlocking the runLock prior to calling out to
dependent code.

Thoughts?
-- 
View this message in context: http://www.nabble.com/BatchProcessor-interrupt-side-effects-tp22819960p22819960.html
Sent from the Camel - Development (activemq) mailing list archive at Nabble.com.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message