camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Claus Ibsen (JIRA)" <j...@apache.org>
Subject [jira] Issue Comment Edited: (CAMEL-3348) DefaultShutdownStrategy and ShutdownAware (SedaConsumer) losing exchange
Date Tue, 23 Nov 2010 16:25:26 GMT

    [ https://issues.apache.org/activemq/browse/CAMEL-3348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=63472#action_63472
] 

Claus Ibsen edited comment on CAMEL-3348 at 11/23/10 11:24 AM:
---------------------------------------------------------------

Great.

Camel 2.6 will be released when its _done_ :)
Ah okay we usually do a release every quarter, or maybe a bit faster. 

But we are hitting x-mas time, so I assume in mid Januar 2011.

But anyone is free to raise their voice in the mailing list when they think its time for a
new release.

FuseSource just cut the MR 2.5 release today, but this last fix didn't make it, however the
commit from yesterday did.
So you can consider using the MR 2.5.0 release.
http://repo.fusesource.com/nexus/content/repositories/releases/org/apache/camel/apache-camel/2.5.0-fuse-00-00/


      was (Author: davsclaus):
    Great.

Camel 2.6 will be released when its _done_ :)
Ah okay we usually do a release every quarter, or maybe a bit faster. 

But we are hitting x-mas time, so I assume in mid Januar 2011.

But anyway is free to raise their voice in the mailing list when they think its time for a
new release.

FuseSource just cut the MR 2.5 release today, but this last fix didn't make it, however the
commit from yesterday did.
So you can consider using the MR 2.5.0 release.
http://repo.fusesource.com/nexus/content/repositories/releases/org/apache/camel/apache-camel/2.5.0-fuse-00-00/

  
> DefaultShutdownStrategy and ShutdownAware (SedaConsumer) losing exchange
> ------------------------------------------------------------------------
>
>                 Key: CAMEL-3348
>                 URL: https://issues.apache.org/activemq/browse/CAMEL-3348
>             Project: Apache Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.5.0
>            Reporter: Damien Delautre
>            Assignee: Claus Ibsen
>             Fix For: 2.6.0
>
>
> There's a problem when we shutdown the camel context with a seda endpoint.
> In the SedaConsumer, the exchange is removed from the queue and then, later, is added
to the InflightRepository as shown in the following code (I put comments where it is done):
> {code}
> public void run() {
>         BlockingQueue<Exchange> queue = endpoint.getQueue();
>         while (queue != null && isRunAllowed()) {
>             final Exchange exchange;
>             try {
>                 exchange = queue.poll(1000, TimeUnit.MILLISECONDS); // The exchange is
removed here from the queue
>             } catch (InterruptedException e) {
>                 if (LOG.isDebugEnabled()) {
>                     LOG.debug("Sleep interrupted, are we stopping? " + (isStopping()
|| isStopped()));
>                 }
>                 continue;
>             }
>             if (exchange != null) {
>                 if (isRunAllowed()) {
>                     try {
>                         sendToConsumers(exchange); // Call to sendToConsumers detailed
below
>                         if (exchange.getException() != null) {
>                             getExceptionHandler().handleException("Error processing exchange",
exchange, exchange.getException());
>                         }
>                     } catch (Exception e) {
>                         getExceptionHandler().handleException("Error processing exchange",
exchange, e);
>                     }
>                 } else {
>                     if (LOG.isWarnEnabled()) {
>                         LOG.warn("This consumer is stopped during polling an exchange,
so putting it back on the seda queue: " + exchange);
>                     }
>                     try {
>                         queue.put(exchange);
>                     } catch (InterruptedException e) {
>                         if (LOG.isDebugEnabled()) {
>                             LOG.debug("Sleep interrupted, are we stopping? " + (isStopping()
|| isStopped()));
>                         }
>                         continue;
>                     }
>                 }
>             }
>         }
>     }
>     protected void sendToConsumers(Exchange exchange) throws Exception {
>         int size = endpoint.getConsumers().size();
>         if (size > 1) {
>             if (LOG.isDebugEnabled()) {
>                 LOG.debug("Multicasting to " + endpoint.getConsumers().size() + " consumers
for Exchange: " + exchange);
>             }
>            
>             MulticastProcessor mp = endpoint.getConumserMulticastProcessor();
>             AsyncProcessorHelper.process(mp, exchange, new AsyncCallback() {
>                 public void done(boolean doneSync) {
>                 }
>             });
>         } else {
>             AsyncProcessorHelper.process(processor, exchange, new AsyncCallback() { //
This line will create the UnitOfWork (in UnitOfWorkProcessor) which will put the exchange
in the InflightRepository
>                 public void done(boolean doneSync) {
>                 }
>             });
>         }
>     }
> {code}
> If the shutdown occurs between these two actions, the DefaultShutdownStrategy will shutdown
the route even if there is a message in progress. And the message will be lost.
> Here is the code of ShutdownTask in DefaultShutdownStrategy which cause the shutdown
even if there is some messages still in progress. (I put comments in it to show the state
of the seda queue and InflightRepository if it is called between the queue.poll() and the
InflightRepository.add())
> {code}
> for (Consumer consumer : order.getInputs()) {
>                         int inflight = context.getInflightRepository().size(consumer.getEndpoint());
// check the number of inflight exchanges which is 0 because the UnitOfWork is not created
>                         if (consumer instanceof ShutdownAware) {
>                             inflight += ((ShutdownAware) consumer).getPendingExchangesSize();
// check the number of exchange in the seda queue which is 0 because the message is already
removed
>                         }
>                         if (inflight > 0) {
>                             size += inflight;
>                             if (LOG.isDebugEnabled()) {
>                                 LOG.debug(inflight + " inflight and pending exchanges
for consumer: " + consumer);
>                             }
>                         }
>                     }
> {code}
> You can reproduce it by putting a breakpoint in the method {code}protected void sendToConsumers(Exchange
exchange){code} in SedaConsumer and calling stop() on the CamelContext while the thread is
suspended by the breakpoint.
> We caught the problem in a unit test where we were testing the shutdown and when our
test server was under heavy load.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message