camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From djose <dennyej...@yahoo.com>
Subject Camel -Mqtt Producer and ProducerTemplate
Date Mon, 02 Jan 2017 16:53:55 GMT
HI, 
I have a strange issue with a camel service I created using camel Mqtt
component. 
the code posted have only relevant instructions regarding the problem..... 
Following is a route definition for my producer endpoint to a mosquitto
broker 
// as a Data producer 
    from("direct:reg_start") 
    .startupOrder(5) 
    .routeId("publisher_data") 
    .bean(new PublishDataRouter(dataUrl,"#sslDataContext"),
"getPublishDataRoute") 
    .end(); 

This is a SSL route and the PublishDataRouter class uses a RecepientList to
dynamically route the incoming payload to the correct topic(topic is
specific to an external client) 
>From PublishDataRouter 
routerUrl = "mqtt:register_producer?host="+url+"&clientId=SERVICE_REG_ACK_"+
rand.nextInt(10000)+ 
                                "&sslContext="+sslRegContext+ 
                                "&publishTopicName=registerAck"; 
@RecipientList 
        public String getPublishDataRoute(@Body String body) 

                         RegistrationObject registerClient =
mapper.readValue(body, RegistrationObject.class); 
                         String registerSerial =
String.valueOf(registerClient.getSerialNumber()); 
                         LOG.debug("Sending registration to route :"+
routerUrl); 
                         return
routerUrl+"_"+registerSerial+"&connectWaitInSeconds=30&qualityOfService=exactlyOnce&byDefaultRetain=true&cleanSession=true";

                        
                
        } 

RecipientList add the correct extension to the topic and return that so the
mqtt component will create a producer to the correct topic. 

Now to activate this route I use a Producer template from my service class 
public class ClientConnectionService { 

@Produce(uri = "direct:reg_start") 
 protected ProducerTemplate template; 
-------- 
------ 
 public void serviceClientConnection(Exchange exchange) { 
if(template == null) 
        template = ctx.createProducerTemplate(); 
Future future = template.asyncCallbackSendBody("direct:reg_start",
renewedValues, 
                               new SynchronizationAdapter() { 
                                        @Override 
                                        public void onComplete(Exchange
exchange) { 
                                             //log success//failure 
                                        } 
                                }); 
        
} 

} 

this works fine for the first time. the ProducerTemplate creates an Endpoint
connection to the broker and 
fire the publisher and data get published to my client. when I check the 
debug 
//relevant trace masked ip and port.... 
+++++++++++++++++++++++++ 
[        hawtdispatch-DEFAULT-1] DefaultExecutorServiceManager  DEBUG
Created new ThreadPool for source:
org.apache.camel.impl.DefaultProducerTemplate@764ec1ae with name:
ProducerTemplate. ->
org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@6f37b367[Running,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
0][ProducerTemplate] 
[) thread #0 - ProducerTemplate] DirectProducer                 DEBUG
Starting producer: Producer[direct://reg_start] 
[) thread #0 - ProducerTemplate] ProducerCache                  DEBUG Adding
to producer cache with key: direct://reg_start for producer:
Producer[direct://reg_start] 
[) thread #0 - ProducerTemplate] ProducerCache                  DEBUG >>>>
direct://reg_start Exchange[] 
[) thread #0 - ProducerTemplate] PublishRegistrationRouter      DEBUG
Sending registration to route
:mqtt:register_producer?host=ssl://xxxxxxxxxxxxxxx:xxxx&clientId=SERVICE_REG_ACK_5799&sslContext=#sslRegisterContext&publishTopicName=registerAck

[) thread #0 - ProducerTemplate] DefaultComponent               WARN 
Supplied URI
'mqtt:register_producer?host=ssl://xxxxxxxxxxxxxx:xxxx&clientId=SERVICE_REG_ACK_5799&sslContext=#sslRegisterContext&publishTopicName=registerAck_T00BE187672184R7NUAAJ6KV5LHQV3LU&connectWaitInSeconds=30&qualityOfService=exactlyOnce&byDefaultRetain=true&cleanSession=true'
contains unsafe characters, please check encoding 
[) thread #0 - ProducerTemplate] DefaultComponent               DEBUG
Creating endpoint
uri=[mqtt:register_producer?host=ssl://xxxxxxxxxxxxx:xxxx&clientId=SERVICE_REG_ACK_5799&sslContext=#sslRegisterContext&publishTopicName=registerAck_T00BE187672184R7NUAAJ6KV5LHQV3LU&connectWaitInSeconds=30&qualityOfService=exactlyOnce&byDefaultRetain=true&cleanSession=true],
path=[register_producer] 
[) thread #0 - ProducerTemplate] IntrospectionSupport           DEBUG
Configured property: sslContext on bean:
org.apache.camel.component.mqtt.MQTTConfiguration@76358779 with value:
javax.net.ssl.SSLContext@132ddbab 
[) thread #0 - ProducerTemplate] IntrospectionSupport           DEBUG
Configured property: host on bean:
org.apache.camel.component.mqtt.MQTTConfiguration@76358779 with value:
ssl://xxxxxxxxxx:xxxx 
[) thread #0 - ProducerTemplate] IntrospectionSupport           DEBUG
Configured property: clientId on bean:
org.apache.camel.component.mqtt.MQTTConfiguration@76358779 with value:
SERVICE_REG_ACK_5799 
[) thread #0 - ProducerTemplate] IntrospectionSupport           DEBUG
Configured property: publishTopicName on bean:
org.apache.camel.component.mqtt.MQTTConfiguration@76358779 with value:
registerAck_T00BE187672184R7NUAAJ6KV5LHQV3LU 
[) thread #0 - ProducerTemplate] IntrospectionSupport           DEBUG
Configured property: connectWaitInSeconds on bean:
org.apache.camel.component.mqtt.MQTTConfiguration@76358779 with value: 30 
[) thread #0 - ProducerTemplate] IntrospectionSupport           DEBUG
Configured property: qualityOfService on bean:
org.apache.camel.component.mqtt.MQTTConfiguration@76358779 with value:
exactlyOnce 
[) thread #0 - ProducerTemplate] IntrospectionSupport           DEBUG
Configured property: byDefaultRetain on bean:
org.apache.camel.component.mqtt.MQTTConfiguration@76358779 with value: true 
[) thread #0 - ProducerTemplate] IntrospectionSupport           DEBUG
Configured property: cleanSession on bean:
org.apache.camel.component.mqtt.MQTTConfiguration@76358779 with value: true 
[) thread #0 - ProducerTemplate] DefaultCamelContext            DEBUG
mqtt://register_producer?byDefaultRetain=true&cleanSession=true&clientId=ADA_REG_ACK_5799&connectWaitInSeconds=30&host=ssl%3A%2F%2Fxxxxxxxxxx%3Axxxx&publishTopicName=registerAck_T00BE187672184R7NUAAJ6KV5LHQV3LU&qualityOfService=exactlyOnce&sslContext=%23sslRegisterContext
converted to endpoint:
mqtt:register_producer?host=ssl://xxxxxxxxx:xxxx&clientId=SERVICE_REG_ACK_5799&sslContext=#sslRegisterContext&publishTopicName=registerAck_T00BE187672184R7NUAAJ6KV5LHQV3LU&connectWaitInSeconds=30&qualityOfService=exactlyOnce&byDefaultRetain=true&cleanSession=true
by component: org.apache.camel.component.mqtt.MQTTComponent@6722db6e 
[) thread #0 - ProducerTemplate] MQTTEndpoint                   INFO 
Connecting to ssl://xxxxxxxxxxxxxxx:xxxx using 30 seconds timeout 
[        hawtdispatch-DEFAULT-1] MQTTEndpoint                   DEBUG
Connected to ssl://xxxxxxxxxx:xxxx 
[        hawtdispatch-DEFAULT-1] MQTTEndpoint                   WARN  No
topic subscriptions were specified in configuration 
[) thread #0 - ProducerTemplate] MQTTProducer                   DEBUG
Starting producer:
Producer[mqtt:register_producer?host=ssl://xxxxxxxxx:xxxx&clientId=SERVICE_REG_ACK_5799&sslContext=#sslRegisterContext&publishTopicName=registerAck_T00BE187672184R7NUAAJ6KV5LHQV3LU&connectWaitInSeconds=30&qualityOfService=exactlyOnce&byDefaultRetain=true&cleanSession=true]
[) thread #0 - ProducerTemplate] ProducerCache                  DEBUG Adding
to producer cache with key:
mqtt:register_producer?host=ssl://xxxxxxxxxx:xxxx&clientId=SERVICE_REG_ACK_5799&sslContext=#sslRegisterContext&publishTopicName=registerAck_T00BE187672184R7NUAAJ6KV5LHQV3LU&connectWaitInSeconds=30&qualityOfService=exactlyOnce&byDefaultRetain=true&cleanSession=true
for producer:
Producer[mqtt:register_producer?host=ssl://xxxxxxxxxxxxxx:xxxx&clientId=SERVICE_REG_ACK_5799&sslContext=#sslRegisterContext&publishTopicName=registerAck_T00BE187672184R7NUAAJ6KV5LHQV3LU&connectWaitInSeconds=30&qualityOfService=exactlyOnce&byDefaultRetain=true&cleanSession=true]
[        hawtdispatch-DEFAULT-1] MQTTEndpoint                   INFO  MQTT
Connection connected to ssl://xxxxxxxxxxxx:xxxx 
[) thread #0 - ProducerTemplate] MQTTProducer                   DEBUG
Publishing to registerAck_T00BE187672184R7NUAAJ6KV5LHQV3LU 
[        hawtdispatch-DEFAULT-1] MQTTEndpoint                   DEBUG
Publishing to ssl://xxxxxxxxxxxx:xxxx 
[        hawtdispatch-DEFAULT-1] ClientRegisterService          INFO 
Registration finished for client : T00BE187672184R7NUAAJ6KV5LHQV3LU 


+++++++++++++++++ 
Please notice that the MQTT producer connects to the endpoint first here and
then publish 
[        hawtdispatch-DEFAULT-1] MQTTEndpoint                   INFO  MQTT
Connection connected to ssl://xxxxxxxxxxxx:xxxx 
[) thread #0 - ProducerTemplate] MQTTProducer                   DEBUG
Publishing to registerAck_T00BE187672184R7NUAAJ6KV5LHQV3LU 
[        hawtdispatch-DEFAULT-1] MQTTEndpoint                   DEBUG
Publishing to ssl://xxxxxxxxxxxx:xxxx 

This will publish my data to the broker and the broker will forward data to
my client. Now the broker has establish a connection to that client 

Now the second time I do publish through the same channel but using a
different clientid and different topic without restarting the service
code(this code is a server service which respond to different clients on
some data request) 
The log from the second request 

++++++++++++++++++++++++++++++++ 
[) thread #3 - ProducerTemplate] ProducerCache                  DEBUG >>>>
direct://reg_start Exchange[] 
[) thread #3 - ProducerTemplate] PublishRegistrationRouter      DEBUG
Sending registration to route
:mqtt:register_producer?host=ssl://xxxxxxxxxx:xxxx&clientId=RegistrationObject
registerClient =
SERVICE_REG_ACK_5799&sslContext=#sslRegisterContext&publishTopicName=registerAck 
[) thread #3 - ProducerTemplate] DefaultComponent               WARN 
Supplied URI
'mqtt:register_producer?host=ssl://xxxxxxxxxxxx:xxxx&clientId=SERVICE_REG_ACK_5799&sslContext=#sslRegisterContext&publishTopicName=registerAck_T00BE187672184JVORCL7QKI6O815RMT&connectWaitInSeconds=30&qualityOfService=exactlyOnce&byDefaultRetain=true&cleanSession=true'
contains unsafe characters, please check encoding 
[) thread #3 - ProducerTemplate] DefaultComponent               DEBUG
Creating endpoint
uri=[mqtt:register_producer?host=ssl://xxxxxxxxxx:xxxx&clientId=SERVICE_REG_ACK_5799&sslContext=#sslRegisterContext&publishTopicName=registerAck_T00BE187672184JVORCL7QKI6O815RMT&connectWaitInSeconds=30&qualityOfService=exactlyOnce&byDefaultRetain=true&cleanSession=true],
path=[register_producer] 
[) thread #3 - ProducerTemplate] IntrospectionSupport           DEBUG
Configured property: sslContext on bean:
org.apache.camel.component.mqtt.MQTTConfiguration@399c5e34 with value:
javax.net.ssl.SSLContext@132ddbab 
[) thread #3 - ProducerTemplate] IntrospectionSupport           DEBUG
Configured property: host on bean:
org.apache.camel.component.mqtt.MQTTConfiguration@399c5e34 with value:
ssl://xxxxxxxxx:xxxx 
[) thread #3 - ProducerTemplate] IntrospectionSupport           DEBUG
Configured property: clientId on bean:
org.apache.camel.component.mqtt.MQTTConfiguration@399c5e34 with value:
SERVICE_REG_ACK_5799 
[) thread #3 - ProducerTemplate] IntrospectionSupport           DEBUG
Configured property: publishTopicName on bean:
org.apache.camel.component.mqtt.MQTTConfiguration@399c5e34 with value:
registerAck_T00BE187672184JVORCL7QKI6O815RMT 
[) thread #3 - ProducerTemplate] IntrospectionSupport           DEBUG
Configured property: connectWaitInSeconds on bean:
org.apache.camel.component.mqtt.MQTTConfiguration@399c5e34 with value: 30 
[) thread #3 - ProducerTemplate] IntrospectionSupport           DEBUG
Configured property: qualityOfService on bean:
org.apache.camel.component.mqtt.MQTTConfiguration@399c5e34 with value:
exactlyOnce 
[) thread #3 - ProducerTemplate] IntrospectionSupport           DEBUG
Configured property: byDefaultRetain on bean:
org.apache.camel.component.mqtt.MQTTConfiguration@399c5e34 with value: true 
[) thread #3 - ProducerTemplate] IntrospectionSupport           DEBUG
Configured property: cleanSession on bean:
org.apache.camel.component.mqtt.MQTTConfiguration@399c5e34 with value: true 
[) thread #3 - ProducerTemplate] DefaultCamelContext            DEBUG
mqtt://register_producer?byDefaultRetain=true&cleanSession=true&clientId=ADA_REG_ACK_5799&connectWaitInSeconds=30&host=ssl%3A%2F%2Fxxxxxxxxx%3Axxxx&publishTopicName=registerAck_T00BE187672184JVORCL7QKI6O815RMT&qualityOfService=exactlyOnce&sslContext=%23sslRegisterContext
converted to endpoint:
mqtt:register_producer?host=ssl://xxxxxxxxx:xxxx&clientId=SERVICE_REG_ACK_5799&sslContext=#sslRegisterContext&publishTopicName=registerAck_T00BE187672184JVORCL7QKI6O815RMT&connectWaitInSeconds=30&qualityOfService=exactlyOnce&byDefaultRetain=true&cleanSession=true
by component: org.apache.camel.component.mqtt.MQTTComponent@6722db6e 
[) thread #3 - ProducerTemplate] MQTTEndpoint                   INFO 
Connecting to ssl://xxxxxxxxxx:xxxx using 30 seconds timeout 
[        hawtdispatch-DEFAULT-1] MQTTEndpoint                   DEBUG
Connected to ssl://xxxxxxxxxxx:xxxx 
[        hawtdispatch-DEFAULT-1] MQTTEndpoint                   WARN  No
topic subscriptions were specified in configuration 
[) thread #3 - ProducerTemplate] MQTTProducer                   DEBUG
Starting producer:
Producer[mqtt:register_producer?host=ssl://xxxxxxxxxxxxxxxx:xxxx&clientId=SERVICE_REG_ACK_5799&sslContext=#sslRegisterContext&publishTopicName=registerAck_T00BE187672184JVORCL7QKI6O815RMT&connectWaitInSeconds=30&qualityOfService=exactlyOnce&byDefaultRetain=true&cleanSession=true]
[) thread #3 - ProducerTemplate] ProducerCache                  DEBUG Adding
to producer cache with key:
mqtt:register_producer?host=ssl://xxxxxxxxxxxxxx:xxxx&clientId=SERVICE_REG_ACK_5799&sslContext=#sslRegisterContext&publishTopicName=registerAck_T00BE187672184JVORCL7QKI6O815RMT&connectWaitInSeconds=30&qualityOfService=exactlyOnce&byDefaultRetain=true&cleanSession=true
for producer:
Producer[mqtt:register_producer?host=ssl://xxxxxxxxxxxxxx:xxxx&clientId=SERVICE_REG_ACK_5799&sslContext=#sslRegisterContext&publishTopicName=registerAck_T00BE187672184JVORCL7QKI6O815RMT&connectWaitInSeconds=30&qualityOfService=exactlyOnce&byDefaultRetain=true&cleanSession=true]
[) thread #3 - ProducerTemplate] MQTTProducer                   DEBUG
Publishing to registerAck_T00BE187672184JVORCL7QKI6O815RMT 
[) thread #3 - ProducerTemplate] MQTTEndpoint                   WARN  #1
attempt to re-create connection to ssl://xxxxxxxxxxxxxx:xxxx before
publishing 
[) thread #3 - ProducerTemplate] MQTTEndpoint                   INFO 
Connecting to ssl://xxxxxxxxxxx:xxxx using 30 seconds timeout 
[        hawtdispatch-DEFAULT-1] MQTTEndpoint                   INFO  MQTT
Connection connected to ssl://xxxxxxxxxxxxxxxx:xxxx 
[        hawtdispatch-DEFAULT-1] MQTTEndpoint                   DEBUG MQTT
Connection disconnected from ssl://xxxxxxxxxxxxxxxxxx:xxxx 
[        hawtdispatch-DEFAULT-1] MQTTEndpoint                   DEBUG
Connected to ssl://xxxxxxxxxxxxxxxxxx:xxxx 
[        hawtdispatch-DEFAULT-1] MQTTEndpoint                   WARN  No
topic subscriptions were specified in configuration 
[        hawtdispatch-DEFAULT-1] MQTTEndpoint                   INFO  MQTT
Connection connected to ssl://xxxxxxxxxxxxxxxxxx:xxxx 
[        hawtdispatch-DEFAULT-1] MQTTEndpoint                   INFO  MQTT
Connection connected to ssl://xxxxxxxxxxxxxxxxxx:xxxx 
[        hawtdispatch-DEFAULT-1] MQTTEndpoint                   DEBUG
Publishing to ssl://xxxxxxxxxxxxxxxxxx:xxxx 
[        hawtdispatch-DEFAULT-1] MQTTEndpoint                   DEBUG MQTT
Connection disconnected from ssl://xxxxxxxxxxxxxxxxxx:xxxx 

+++++++++++++++++++++ 
Notice that the second time instead of going through the connect endpoint ,
MQTT publisher directly try to publish and end in recreating the connection. 
[) thread #3 - ProducerTemplate] MQTTProducer                   DEBUG
Publishing to registerAck_T00BE187672184JVORCL7QKI6O815RMT 
[) thread #3 - ProducerTemplate] MQTTEndpoint                   WARN  #1
attempt to re-create connection to ssl://xxxxxxxxxxxxxx:xxxx before
publishing 
[) thread #3 - ProducerTemplate] MQTTEndpoint                   INFO 
Connecting to ssl://xxxxxxxxxxx:xxxx using 30 seconds timeout 

now since this connection already exist on the broker(the broker log is
below), it try to disconnect the old connection and MQTT reconnects again
and I'm end up in a cycle here. 
>From the code of camel MQTT , I saw a flag is checked in the publish method
for existing connection and a retry loop is established once the flag
returns false. 
code from MQTT component 
    void publish(final String topic, final byte[] payload, final QoS qoS,
final boolean retain, final Callback<Void> callback) throws Exception { 
        // if not connected then create a new connection to re-connect 
        boolean done = isConnected(); 
        int attempt = 0; 
        TimeoutException timeout = null; 
        while (!done && attempt <= PUBLISH_MAX_RECONNECT_ATTEMPTS) { 
            attempt++; 
            try { 
                LOG.warn("#{} attempt to re-create connection to {} before
publishing", attempt, configuration.getHost()); 
                createConnection(); 
                connect(); 

Why in my case the flag says no connection?? already a connection to the
endpoint is established before it goes to publish from the trace... 
Potentially I will have lots of requests coming through that channel and
this can be a real issue. 
here is the log from the mosquitto broker after the issue 

 1483366826: Client SERVICE_REG_ACK_5799 already connected, closing old
connection. 
1483366826: Client SERVICE_REG_ACK_5799 disconnected. 
1483366826: New client connected from xxxxxxxxxxxxxxx as
SERVICE_REG_ACK_5799 (c1, k30, u'xxxxxxxxxxxxx'). 
1483366826: Sending CONNACK to SERVICE_REG_ACK_5799 (0, 0) 
1483366826: OpenSSL Error: error:140F3042:SSL
routines:SSL_UNDEFINED_CONST_FUNCTION:called a function you should not call 
1483366826: Socket error on client <unknown>, disconnecting. 
1483366826: Client SERVICE_REG_ACK_5799 already connected, closing old
connection. 
1483366826: Client SERVICE_REG_ACK_5799 disconnected. 
1483366826: New client connected from xxxxxxxxxxxxx as SERVICE_REG_ACK_5799
(c1, k30, u'xxxxxxxxxxxxx'). 
1483366826: Sending CONNACK to SERVICE_REG_ACK_5799 (0, 0) 
1483366826: OpenSSL Error: error:140F3042:SSL
routines:SSL_UNDEFINED_CONST_FUNCTION:called a function you should not call 
1483366826: Socket error on client <unknown>, disconnecting. 

according to openssl that undefined error occurs when a connection request
come in through the same channel when already connecting request is
pending... 
I think that happens because the frequent connect /disconnect happens from
the camel Mqtt. 
Please advice what I'm doing wrong here and how to resolve this issue?? 
Thanks



--
View this message in context: http://camel.465427.n5.nabble.com/Camel-Mqtt-Producer-and-ProducerTemplate-tp5792073.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Mime
View raw message