camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Davy De Waele (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (CAMEL-7922) MQTT endpoint misses QoS > 0 messages due to startup timing issue
Date Fri, 17 Oct 2014 06:51:33 GMT

     [ https://issues.apache.org/jira/browse/CAMEL-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Davy De Waele updated CAMEL-7922:
---------------------------------
    Description: 
When the MQTT Endpoint is started the MQTT connection is immediately established, causing
an immediate influx of persisted messages (put on the topic when the client was not available).


Issue is that at this point, most likely no consumers are available yet to process these messages.


*Receiving a PUBLISH message*

Publish message are received without any consumers. Result : msg with QoS > 0 that were
put on the topic while the client was not connected are never processed.

{noformat}
Daemon Thread [hawtdispatch-DEFAULT-3] (Suspended (breakpoint at line 815 in CallbackConnection))

	CallbackConnection.toReceiver(PUBLISH) line: 815	
	CallbackConnection.processFrame(MQTTFrame) line: 732	
	CallbackConnection.access$1500(CallbackConnection, MQTTFrame) line: 51	
	CallbackConnection$6.onTransportCommand(Object) line: 392	
	TcpTransport.drainInbound() line: 709	
	TcpTransport$6.run() line: 588	
	NioDispatchSource$3.run() line: 209	
	SerialDispatchQueue.run() line: 100	
	SimpleThread.run() line: 77	
{noformat}

*No consumers registered yet*

Only when this finishes will Camel be able to process the messages.

{noformat}
Daemon Thread [localhost-startStop-1] (Suspended (breakpoint at line 164 in MQTTEndpoint))

	owns: SpringCamelContext  (id=92)	
	owns: Object  (id=143)	
	owns: StandardContext  (id=144)	
	MQTTEndpoint.addConsumer(MQTTConsumer) line: 164	
	MQTTConsumer.doStart() line: 35	
	MQTTConsumer(ServiceSupport).start() line: 61	
	SpringCamelContext(DefaultCamelContext).startService(Service) line: 2158	
	SpringCamelContext(DefaultCamelContext).doStartOrResumeRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
boolean, boolean) line: 2452	
	SpringCamelContext(DefaultCamelContext).doStartRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
boolean) line: 2388	
	SpringCamelContext(DefaultCamelContext).safelyStartRouteServices(boolean, boolean, boolean,
boolean, Collection<RouteService>) line: 2318	
	SpringCamelContext(DefaultCamelContext).doStartOrResumeRoutes(Map<String,RouteService>,
boolean, boolean, boolean, boolean) line: 2091	
	SpringCamelContext(DefaultCamelContext).doStartCamel() line: 1951	
	SpringCamelContext(DefaultCamelContext).doStart() line: 1777	
{noformat}

These messages will never be picked up.

Perhaps it's more the responsibility of the consumer / producer to start a connection when
they get attached to the endpoint ? 


  was:
The FuseSource client can start receiving messages as a result of a topic subscription BEFORE
any consumers have been registered on the MQTT endpoint. When the MQTT Endpoint is started
the MQTT connection is established, causing an immediate influx of persisted messages. Issue
is that at this point, no consumers are available yet to process these messages.

*Receiving a PUBLISH message*

Publish message are received without any consumers. Result : msg with QoS > 0 are never
processed.

{noformat}
Daemon Thread [hawtdispatch-DEFAULT-3] (Suspended (breakpoint at line 815 in CallbackConnection))

	CallbackConnection.toReceiver(PUBLISH) line: 815	
	CallbackConnection.processFrame(MQTTFrame) line: 732	
	CallbackConnection.access$1500(CallbackConnection, MQTTFrame) line: 51	
	CallbackConnection$6.onTransportCommand(Object) line: 392	
	TcpTransport.drainInbound() line: 709	
	TcpTransport$6.run() line: 588	
	NioDispatchSource$3.run() line: 209	
	SerialDispatchQueue.run() line: 100	
	SimpleThread.run() line: 77	
{noformat}

*No consumers registered yet*

Only when this finishes will Camel be able to process the messages.

{noformat}
Daemon Thread [localhost-startStop-1] (Suspended (breakpoint at line 164 in MQTTEndpoint))

	owns: SpringCamelContext  (id=92)	
	owns: Object  (id=143)	
	owns: StandardContext  (id=144)	
	MQTTEndpoint.addConsumer(MQTTConsumer) line: 164	
	MQTTConsumer.doStart() line: 35	
	MQTTConsumer(ServiceSupport).start() line: 61	
	SpringCamelContext(DefaultCamelContext).startService(Service) line: 2158	
	SpringCamelContext(DefaultCamelContext).doStartOrResumeRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
boolean, boolean) line: 2452	
	SpringCamelContext(DefaultCamelContext).doStartRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
boolean) line: 2388	
	SpringCamelContext(DefaultCamelContext).safelyStartRouteServices(boolean, boolean, boolean,
boolean, Collection<RouteService>) line: 2318	
	SpringCamelContext(DefaultCamelContext).doStartOrResumeRoutes(Map<String,RouteService>,
boolean, boolean, boolean, boolean) line: 2091	
	SpringCamelContext(DefaultCamelContext).doStartCamel() line: 1951	
	SpringCamelContext(DefaultCamelContext).doStart() line: 1777	
{noformat}

These messages will never be picked up untill camel is restarted.


> MQTT endpoint misses QoS > 0 messages due to startup timing issue
> -----------------------------------------------------------------
>
>                 Key: CAMEL-7922
>                 URL: https://issues.apache.org/jira/browse/CAMEL-7922
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-mqtt
>    Affects Versions: 2.14.0
>            Reporter: Davy De Waele
>            Assignee: Willem Jiang
>
> When the MQTT Endpoint is started the MQTT connection is immediately established, causing
an immediate influx of persisted messages (put on the topic when the client was not available).

> Issue is that at this point, most likely no consumers are available yet to process these
messages.
> *Receiving a PUBLISH message*
> Publish message are received without any consumers. Result : msg with QoS > 0 that
were put on the topic while the client was not connected are never processed.
> {noformat}
> Daemon Thread [hawtdispatch-DEFAULT-3] (Suspended (breakpoint at line 815 in CallbackConnection))

> 	CallbackConnection.toReceiver(PUBLISH) line: 815	
> 	CallbackConnection.processFrame(MQTTFrame) line: 732	
> 	CallbackConnection.access$1500(CallbackConnection, MQTTFrame) line: 51	
> 	CallbackConnection$6.onTransportCommand(Object) line: 392	
> 	TcpTransport.drainInbound() line: 709	
> 	TcpTransport$6.run() line: 588	
> 	NioDispatchSource$3.run() line: 209	
> 	SerialDispatchQueue.run() line: 100	
> 	SimpleThread.run() line: 77	
> {noformat}
> *No consumers registered yet*
> Only when this finishes will Camel be able to process the messages.
> {noformat}
> Daemon Thread [localhost-startStop-1] (Suspended (breakpoint at line 164 in MQTTEndpoint))

> 	owns: SpringCamelContext  (id=92)	
> 	owns: Object  (id=143)	
> 	owns: StandardContext  (id=144)	
> 	MQTTEndpoint.addConsumer(MQTTConsumer) line: 164	
> 	MQTTConsumer.doStart() line: 35	
> 	MQTTConsumer(ServiceSupport).start() line: 61	
> 	SpringCamelContext(DefaultCamelContext).startService(Service) line: 2158	
> 	SpringCamelContext(DefaultCamelContext).doStartOrResumeRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
boolean, boolean) line: 2452	
> 	SpringCamelContext(DefaultCamelContext).doStartRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
boolean) line: 2388	
> 	SpringCamelContext(DefaultCamelContext).safelyStartRouteServices(boolean, boolean, boolean,
boolean, Collection<RouteService>) line: 2318	
> 	SpringCamelContext(DefaultCamelContext).doStartOrResumeRoutes(Map<String,RouteService>,
boolean, boolean, boolean, boolean) line: 2091	
> 	SpringCamelContext(DefaultCamelContext).doStartCamel() line: 1951	
> 	SpringCamelContext(DefaultCamelContext).doStart() line: 1777	
> {noformat}
> These messages will never be picked up.
> Perhaps it's more the responsibility of the consumer / producer to start a connection
when they get attached to the endpoint ? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message