camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ralf Kornberger (JIRA)" <j...@apache.org>
Subject [jira] [Created] (CAMEL-6717) dead lock when processing fetching/sending messages at high frequency
Date Mon, 09 Sep 2013 11:58:51 GMT
Ralf Kornberger created CAMEL-6717:
--------------------------------------

             Summary: dead lock when processing fetching/sending messages at high frequency
                 Key: CAMEL-6717
                 URL: https://issues.apache.org/jira/browse/CAMEL-6717
             Project: Camel
          Issue Type: Bug
          Components: camel-mqtt
    Affects Versions: 2.11.0
         Environment: jdk 1.6.32
            Reporter: Ralf Kornberger


I'm using Apache Camel with MQTT to fetch data from a Mosquitto broker. Data are published
there at high
frequency (< 10s) by serveral devices. After receiving the data, I send an acknowlege message
back. This is done by publishing a message
to a topic for each device. I'm using the Fusesource MQTT Client (version 2.5) for this.
I encountered the following problem: after some time (can be 15 minutes up to 1 day) some
thing "weird" happens.
The application stops receiving or sending any data via MQTT. Looking at it with jstack reveals
the following:


"hawtdispatch-DEFAULT-2" daemon prio=10 tid=0x00007facc1a2f000 nid=0x782d waiting on condition
[0x00007fac42bcf000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000078e792b88> (a java.util.concurrent.CountDownLatch$Sync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
	at org.fusesource.mqtt.client.Promise.await(Promise.java:88)
	at org.fusesource.mqtt.client.BlockingConnection.publish(BlockingConnection.java:73)
	at org.fusesource.mqtt.client.BlockingConnection.publish(BlockingConnection.java:82)
	at net.centersight.plugins.agent.protomqtt.comm.MQTTManager.sendACKMessage(MQTTManager.java:92)
	at net.centersight.plugins.agent.protomqtt.comm.MQTTCommunication.sendACKMessage(MQTTCommunication.java:116)
	at net.centersight.plugins.agent.protomqtt.camel.AgentMQTTbatchACKer.process(AgentMQTTbatchACKer.java:47)
	at sun.reflect.GeneratedMethodAccessor101.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
	at java.lang.reflect.Method.invoke(Method.java:597)
	at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:341)
	at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:238)
	at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:166)
	at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:73)
	at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
	at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:334)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:220)
	at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303)
	at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:117)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
	at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
	at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
	at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
	at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:73)
	at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)
	at org.apache.camel.component.mqtt.MQTTConsumer.processExchange(MQTTConsumer.java:46)
	at org.apache.camel.component.mqtt.MQTTEndpoint$1.onPublish(MQTTEndpoint.java:88)
	at org.fusesource.mqtt.client.CallbackConnection.toReceiver(CallbackConnection.java:815)
	at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:732)
	at org.fusesource.mqtt.client.CallbackConnection.access$17(CallbackConnection.java:727)
	at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:660)
	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:226)
	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:96)
	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)

   Locked ownable synchronizers:
	- None
	
Appearently, both the Camel receiving thread and the Fusesource client thread are hanging
at
at org.fusesource.mqtt.client.Promise.await(Promise.java:88)

Since I use BlockingConnection in my sending client, I took a look at the Fusesource MQTT
client.
In BlockingConnection.java, function 
public void publish(final UTF8Buffer topic, final Buffer payload, final QoS qos, final boolean
retain) throws Exception 

in line 80, a Future is received on publishing. And there is an await() afterwards.
When I change this await() to await(30L, TimeUnit.SECONDS), the problem still occurs, but
the application keeps working.
I've put in debug printouts at the trace class which show me that at the time the problem
occurs the MQTT client seems to loose the connection to the broker and tries to reestablish
it. Debug logs also show that the timeout exception throw by the timeouted await comes every
minute for ca. 20 minutes. Then the problem "vanishes" and comes again after serval hours.

Ps.: I also posted this at github, in the Fusesource MQTT issue tracker:
https://github.com/fusesource/mqtt-client/issues/21#issuecomment-23861700


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message