camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Piero Cangianiello (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (CAMEL-10229) Race condition when stopping context with autoack=false
Date Tue, 09 Aug 2016 15:50:20 GMT

     [ https://issues.apache.org/jira/browse/CAMEL-10229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Piero Cangianiello updated CAMEL-10229:
---------------------------------------
    Description: 
Run the following code and hit enter while one message is in unacked state (see RabbitMQ console):

{code:java}
public static void main(String[] args) throws Exception {
	CamelContext context = new DefaultCamelContext();

	context.addRoutes(new RouteBuilder() {
		@Override
		public void configure() {
			from("rabbitmq://localhost/?queue=sourceQueue&skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false&prefetchEnabled=true&prefetchCount=1")
					.delayer(5000)
					.setHeader("rabbitmq.ROUTING_KEY", constant("destinationQueue"))
					.to("rabbitmq://localhost/?skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false")
					.routeId("myRoute");
		}
	});
	context.start();
	new BufferedReader(new InputStreamReader(System.in)).readLine();
	context.stop();
}
{code}

you get the following exception:

{noformat}
com.rabbitmq.client.impl.DefaultExceptionHandler: Consumer org.apache.camel.component.rabbitmq.RabbitConsumer@4c57777e
(amq.ctag-dWpQw46flmamv0dM_Fa_Qg) method handleDelivery for channel AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1)
threw an exception for channel AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1):
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel
shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0,
method-id=0)
	at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:309)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:303)
	at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1043)
	at org.apache.camel.component.rabbitmq.RabbitConsumer.handleDelivery(RabbitConsumer.java:108)
	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
{noformat}

I think that this is caused by a race condition between the main thread that runs channel.close()
immediately after channel.basicCancel(tag) (see org.apache.camel.component.rabbitmq.RabbitConsumer)
without waiting the channel.basicAck(deliveryTag, false) in handleDelivery().

Another bad side effect is that *you'll find a duplicate of a message* on the destinationQueue.
For example if you have 10 initial messages in sourceQueue and you hit enter while it's processing
the third one, you'll get 7 messages in sourceQueue and 4 messages in destinationQueue.

The correct behaviour should be the following:
1) Stop consumer: channel.basicCancel(tag)
2) Wait if there is a running consumer
3) The consumer acks the previous message
4) Close the channel

  was:
Run the following code and hit enter while one message is in unacked state (see RabbitMQ console):

{code:java}
public static void main(String[] args) throws Exception {
	CamelContext context = new DefaultCamelContext();

	context.addRoutes(new RouteBuilder() {
		@Override
		public void configure() {
			from("rabbitmq://localhost/?queue=sourceQueue&skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false&prefetchEnabled=true&prefetchCount=1")
					.delayer(5000)
					.setHeader("rabbitmq.ROUTING_KEY", constant("destinationQueue"))
					.to("rabbitmq://localhost/?skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false")
					.routeId("myRoute");
		}
	});
	context.start();
	new BufferedReader(new InputStreamReader(System.in)).readLine();
	context.stop();
}
{code}

you get the following exception:

{noformat}
com.rabbitmq.client.impl.DefaultExceptionHandler: Consumer org.apache.camel.component.rabbitmq.RabbitConsumer@4c57777e
(amq.ctag-dWpQw46flmamv0dM_Fa_Qg) method handleDelivery for channel AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1)
threw an exception for channel AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1):
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel
shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0,
method-id=0)
	at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:309)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:303)
	at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1043)
	at org.apache.camel.component.rabbitmq.RabbitConsumer.handleDelivery(RabbitConsumer.java:108)
	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
{noformat}

I think that this is caused by a race condition between the main thread that runs channel.close();
immediately after channel.basicCancel(tag); (see org.apache.camel.component.rabbitmq.RabbitConsumer)
without waiting the channel.basicAck(deliveryTag, false); in handleDelivery().

Another bad side effect is that you'll find a duplicate of a message on the destinationQueue.
For example if you have 10 initial messages in sourceQueue and you hit enter while it's processing
the third one, you'll get 7 messages in sourceQueue and 4 messages in destinationQueue.

The correct behaviour should be the following:
1) Stop consumer: channel.basicCancel(tag)
2) Wait if there is a running consumer
3) The consumer acks the previous message
4) Close the channel


> Race condition when stopping context with autoack=false
> -------------------------------------------------------
>
>                 Key: CAMEL-10229
>                 URL: https://issues.apache.org/jira/browse/CAMEL-10229
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-rabbitmq
>    Affects Versions: 2.17.3
>            Reporter: Piero Cangianiello
>              Labels: autoack, rabbitmq, stop
>
> Run the following code and hit enter while one message is in unacked state (see RabbitMQ
console):
> {code:java}
> public static void main(String[] args) throws Exception {
> 	CamelContext context = new DefaultCamelContext();
> 	context.addRoutes(new RouteBuilder() {
> 		@Override
> 		public void configure() {
> 			from("rabbitmq://localhost/?queue=sourceQueue&skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false&prefetchEnabled=true&prefetchCount=1")
> 					.delayer(5000)
> 					.setHeader("rabbitmq.ROUTING_KEY", constant("destinationQueue"))
> 					.to("rabbitmq://localhost/?skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false")
> 					.routeId("myRoute");
> 		}
> 	});
> 	context.start();
> 	new BufferedReader(new InputStreamReader(System.in)).readLine();
> 	context.stop();
> }
> {code}
> you get the following exception:
> {noformat}
> com.rabbitmq.client.impl.DefaultExceptionHandler: Consumer org.apache.camel.component.rabbitmq.RabbitConsumer@4c57777e
(amq.ctag-dWpQw46flmamv0dM_Fa_Qg) method handleDelivery for channel AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1)
threw an exception for channel AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1):
> com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel
shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0,
method-id=0)
> 	at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
> 	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:309)
> 	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:303)
> 	at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1043)
> 	at org.apache.camel.component.rabbitmq.RabbitConsumer.handleDelivery(RabbitConsumer.java:108)
> 	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
> 	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> {noformat}
> I think that this is caused by a race condition between the main thread that runs channel.close()
immediately after channel.basicCancel(tag) (see org.apache.camel.component.rabbitmq.RabbitConsumer)
without waiting the channel.basicAck(deliveryTag, false) in handleDelivery().
> Another bad side effect is that *you'll find a duplicate of a message* on the destinationQueue.
For example if you have 10 initial messages in sourceQueue and you hit enter while it's processing
the third one, you'll get 7 messages in sourceQueue and 4 messages in destinationQueue.
> The correct behaviour should be the following:
> 1) Stop consumer: channel.basicCancel(tag)
> 2) Wait if there is a running consumer
> 3) The consumer acks the previous message
> 4) Close the channel



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message