camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <claus.ib...@gmail.com>
Subject Re: Calculation of in-flight messages in DefaultShutdownStrategy
Date Thu, 20 Feb 2014 10:40:38 GMT
Messages are not regarded as in-flight while they are holded by aggregator.


On Mon, Feb 10, 2014 at 3:16 PM, chessami92 <chessami92@gmail.com> wrote:
> Hello there,
>
> I am using camel 2.12.2, and I seem to be running into an issue where
> messages are being dropped on graceful shutdown. I've given a sample
> testng/mockito/groovy test that I think demonstrates the issue I am facing -
> the DefaultShutdownStrategy calculates size to be zero when there is really
> still a message pending, then the aggregator is forcibly checked out and
> when the old message finishes processing the route has already been closed
> and there are no consumers.
>
> Can you see any mistake in what I am trying to do, or does this indeed seem
> like a bug with camel? Please note that you can make the test pass by
> increasing the sleep right before the camelContext.stop() line.
>
> import org.apache.camel.CamelContext
> import org.apache.camel.Exchange
> import org.apache.camel.Processor
> import org.apache.camel.ProducerTemplate
> import org.apache.camel.builder.RouteBuilder
> import org.apache.camel.impl.DefaultCamelContext
> import org.apache.camel.processor.aggregate.AggregationStrategy
> import org.mockito.ArgumentCaptor
> import org.mockito.Mock
> import org.mockito.Mockito
> import org.testng.annotations.AfterMethod
> import org.testng.annotations.BeforeMethod
> import org.testng.annotations.Test
>
> import static org.apache.camel.ShutdownRoute.Defer
> import static org.mockito.MockitoAnnotations.initMocks
>
> class AggregatorTest {
>     private CamelContext camelContext
>     private ProducerTemplate producerTemplate
>     @Mock
>     private Processor processor
>
>     @BeforeMethod
>     void setup() {
>         initMocks( this )
>
>         camelContext = new DefaultCamelContext()
>
>         camelContext.addRoutes( new TestRouteBuilder() )
>
>         camelContext.start()
>
>         producerTemplate = camelContext.createProducerTemplate()
>         producerTemplate.setDefaultEndpointUri( 'direct:testInput' )
>     }
>
>     @Test
>     void stuckInAggregator() {
>         producerTemplate.sendBody( 'firstBody' )
>         Thread.sleep( 3 * 1000 )
>         new Sender().start()
>         Thread.sleep( 1000 )
>         camelContext.stop()
>
>         def captor = ArgumentCaptor.forClass( Exchange )
>         Mockito.verify( processor ).process( captor.capture() )
>         assert captor.allValues.in.body == ['firstBody\nsecondBody']
>     }
>
>     private class TestRouteBuilder extends RouteBuilder {
>         @Override
>         void configure() throws Exception {
>             from( 'direct:testInput' )
>                     .startupOrder( 2 )
>                     .shutdownRoute( Defer )
>
>                     .delay( 2 * 1000 )
>                     .aggregate( constant( true ), new Aggregator() )
>                     .completionInterval( 10 * 1000 )
>                     .forceCompletionOnStop()
>                     .completionSize( 10 )
>                     .to( 'direct:testOutput' )
>                     .end()
>
>             from( 'direct:testOutput' )
>                     .startupOrder( 1 )
>                     .shutdownRoute( Defer )
>
>                     .process( processor )
>         }
>     }
>
>     private class Aggregator implements AggregationStrategy {
>         @Override
>         Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
>             if (oldExchange == null) {
>                 return newExchange
>             }
>             String oldBody = oldExchange.in.getBody(String.class)
>             String newBody = newExchange.in.getBody(String.class)
>             oldExchange.in.body = oldBody + "\n" + newBody //Simply join the
> two existing messages
>             return oldExchange
>         }
>     }
>
>     private class Sender extends Thread {
>         public void run() {
>             producerTemplate.sendBody( 'secondBody' )
>         }
>     }
> }
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Calculation-of-in-flight-messages-in-DefaultShutdownStrategy-tp5747034.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
Make your Camel applications look hawt, try: http://hawt.io

Mime
View raw message