activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Timothy Bish (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AMQ-4576) MQTT BlockingConnection.receive fails when subscribing multiple topics
Date Wed, 12 Jun 2013 21:12:20 GMT

    [ https://issues.apache.org/jira/browse/AMQ-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13681640#comment-13681640
] 

Timothy Bish commented on AMQ-4576:
-----------------------------------

This will probably need a bit of thought.  The broker is doing what it was designed to do
here, allowing only one durable subscriber with the same client Id and subscription name.
 In the case of the MQTT code as it currently stands when the subscribe occurs the code uses
the client Id as the subscription name as well. 
                
> MQTT BlockingConnection.receive fails when subscribing multiple topics
> ----------------------------------------------------------------------
>
>                 Key: AMQ-4576
>                 URL: https://issues.apache.org/jira/browse/AMQ-4576
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.8.0
>            Reporter: Pedro Marques
>
> When more than one topic is supplied to BlockingConnection.subscribe the BlockingConnection.receive
fails and the following exception is thrown:
> {code}
> java.io.IOException: Could not connect: CONNECTION_REFUSED_SERVER_UNAVAILABLE
> 	at org.fusesource.mqtt.client.CallbackConnection$LoginHandler$1.onTransportCommand(CallbackConnection.java:331)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
> 	at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
> 	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
> 	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
> 	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> On the server shows the following messages:
> {code}
> 2013-06-06 15:06:00,125 WARN  [org.apache.activemq.transport.mqtt.MQTTProtocolConverter]
(ActiveMQ BrokerService[localhost] Task-1) Exception occurred processing: 
> null: javax.jms.JMSException: Durable consumer is in use for client: 6056@3232261834SOC
and subscriptionName: 6056@3232261834SOC
> 2013-06-06 15:06:00,130 WARN  [org.apache.activemq.broker.TransportConnection] (ActiveMQ
Transport: tcp:///127.0.0.1:53389@1883) Failed to add Connection ID:LTD-SFW004-53303-1370527418664-2:14,
reason: javax.jms.InvalidClientIDException: Broker: localhost - Client: 6056@3232261834SOC
already connected from tcp://127.0.0.1:53388
> 2013-06-06 15:06:00,130 WARN  [org.apache.activemq.broker.TransportConnection.Transport]
(ActiveMQ Transport: tcp:///127.0.0.1:53389@1883) Transport Connection to: tcp://127.0.0.1:53389
failed: java.io.IOException: Broker: localhost - Client: 6056@3232261834SOC already connected
from tcp://127.0.0.1:53388
> 2013-06-06 15:06:00,130 ERROR [pt.intellicare.onecare.mqtt.OneCareFuseMqttClient] (DefaultQuartzScheduler_Worker-8)
Problem receiving mqtt messages: java.io.IOException: Could not connect: CONNECTION_REFUSED_SERVER_UNAVAILABLE
> 	at org.fusesource.mqtt.client.CallbackConnection$LoginHandler$1.onTransportCommand(CallbackConnection.java:331)
[:1.5-SNAPSHOT]
> 	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
[:1.17]
> 	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) [:1.17]
> 	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
[:1.17]
> 	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
[:1.17]
> 	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
[:1.17]
> {code}
> Code example:
> {code}
> MQTT = new MQTT();
> mqtt.setHost(url);
> mqtt.setClientId(clientId);
> mqtt.setUserName(user);
> mqtt.setPassword(password);
> mqtt.setCleanSession(false);
> BlockingConnection connection = mqtt.blockingConnection();
> connection.connect();
> Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE), new Topic("TopicB", QoS.EXACTLY_ONCE)};
> byte[] qoses = connection.subscribe(topics);
> while (true) {
>     Message message = connection.receive();
>     byte[] payload = message.getPayload();
>     String messageContent = new String(payload);
>     System.out.println("Received message from topic: " + message.getTopic() + " Message
content: " + messageContent);
>     message.ack();
> }
> {code}
> The test failed when using the current fusesource client (1.5) on ActiveMQ 5.9, on Mosquitto
mqtt the code works correctly.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message