activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Timothy Bish (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
Date Wed, 19 Jun 2013 20:54:24 GMT

    [ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13688443#comment-13688443
] 

Timothy Bish commented on AMQ-4585:
-----------------------------------

The problem is a tricky one, it looks like there might be an issue in the MQTT client.  The
issue happens on subscribe when for some reason the client receives two SUBACK frames when
there appears to only have been one sent.  You can see it in this trace log where the SUBSCRIBE
followed by a PUBLISH results in two SUBACK frames and the second one causes the client to
bail.  

{noformat}
2013-06-19 16:50:40,556 | DEBUG | MQTTProtocolConverter          | MQTT Client MQTT-Sub-Client
connected.
2013-06-19 16:50:40,557 | INFO  | MQTTTest                       | Client Received:
MQTTFrame { type: CONNACK, qos: AT_MOST_ONCE, dup:false }
2013-06-19 16:50:40,557 | INFO  | MQTTTest                       | MQTT login accepted
2013-06-19 16:50:40,557 | INFO  | MQTTTest                       | Client Sent:
MQTTFrame { type: SUBSCRIBE, qos: AT_LEAST_ONCE, dup:false }
2013-06-19 16:50:40,559 | TRACE | MQTTIO                         | Received: 
MQTTFrame { type: SUBSCRIBE, qos: AT_LEAST_ONCE, dup:false }
2013-06-19 16:50:40,562 | TRACE | MQTTIO                         | Sending: 
MQTTFrame { type: SUBACK, qos: AT_MOST_ONCE, dup:false }
2013-06-19 16:50:40,562 | TRACE | MQTTIO                         | Sending: 
MQTTFrame { type: PUBLISH, qos: AT_LEAST_ONCE, dup:false }
2013-06-19 16:50:40,562 | INFO  | MQTTTest                       | Client Received:
MQTTFrame { type: SUBACK, qos: AT_MOST_ONCE, dup:false }
2013-06-19 16:50:40,562 | INFO  | MQTTTest                       | Client Received:
MQTTFrame { type: SUBACK, qos: AT_MOST_ONCE, dup:false }
2013-06-19 16:50:40,562 | INFO  | MQTTTest                       | Fatal connection failure:
%s
java.net.ProtocolException: Command from server contained an invalid message id: 1
	at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723)
	at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762)
	at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
	at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
2013-06-19 16:50:40,563 | INFO  | MQTTTest                       | Client Received:
MQTTFrame { type: PUBLISH, qos: AT_LEAST_ONCE, dup:false }
2013-06-19 16:50:40,564 | INFO  | BrokerService                  | Apache ActiveMQ 5.9-SNAPSHOT
(localhost, ID:OfficePC-37216-1371675037999-0:1) is shutting down
{noformat}

                
> MQTT BlockingConnection.receive fails when receiving pending messages after reconnect
without cleaning session
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4585
>                 URL: https://issues.apache.org/jira/browse/AMQ-4585
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.8.0
>            Reporter: Pedro Marques
>         Attachments: MQTTTest.java
>
>
> The system throws at least three different types of exceptions when a subscriber receives
the first pending message without cleaning the session. The test case corresponds to receiving
several messages from a publisher then closing the subscriber connection and finally reconnecting
with setCleanSession(false) and attempt to read the messages published while the subscriber
was disconnected.
> The exceptions thrown:
> {code}
> java.net.ProtocolException: Command from server contained an invalid message id: 1
> 	at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723)
> 	at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762)
> 	at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
> 	at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
> 	at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
> 	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
> 	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
> 	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> {code}
> java.lang.ArrayIndexOutOfBoundsException: 0
> 	at org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81)
> 	at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40)
> 	at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749)
> 	at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
> 	at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
> 	at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
> 	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
> 	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
> 	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> {code}
> java.net.ProtocolException: Unexpected MQTT command type: 0
> 	at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775)
> 	at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
> 	at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
> 	at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
> 	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
> 	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
> 	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> No message is shown in the server. The problem doesn't occur always but most of the times
the first reconnection attempt is made. With setCleanSession(true) the system works fine.
> Code sample (publisher, permanently running):
> {code}
> MQTT mqtt = new MQTT();
> mqtt.setHost(url);
> mqtt.setUserName(user);
> mqtt.setPassword(password);
> mqtt.setClientId("test_id");
> int i = 0;
> while (true) {
> 	BlockingConnection connection = mqtt.blockingConnection();
> 	connection.connect();
> 	String message = "TestMessage: " + i;
> 	connection.publish("TopicA", message.getBytes(), QoS.AT_LEAST_ONCE, false);
> 	System.out.println("Vendor: Sent message.");
> 	Thread.sleep(2500);
> 	connection.disconnect();
> 	Thread.sleep(2500);
> 	i++;
> }
> {code}
> Code sample (subscriber, fails multiple times when restarting after the connection is
closed):
> {code}
> BlockingConnection connection = null;
> try {
>     MQTT = new MQTT();
>     mqtt.setHost(url);
>     mqtt.setClientId(clientId);
>     mqtt.setUserName(user);
>     mqtt.setPassword(password);
>     mqtt.setCleanSession(false);
>     connection = mqtt.blockingConnection();
>     connection.connect();
>     Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
>     byte[] qoses = connection.subscribe(topics);
>     int numMessages = 1;
>     while (numMessages % 10 != 0) {
>         Message message = connection.receive();
>         byte[] payload = message.getPayload();
>         String messageContent = new String(payload);
>         System.out.println("Received message from topic: " + message.getTopic() + " Message
content: " + messageContent);
>         message.ack();
>         numMessages++;
>     }
> } finally {
>     if(connection != null) {
>         try {
>             connection.disconnect();
>         } catch (Exception e) {
>             // TODO Auto-generated catch block
>             e.printStackTrace();
>         }
>     }
> }
> {code}
> The test failed when using the current fusesource client (1.5) on ActiveMQ 5.9, on Mosquitto
mqtt the code works correctly

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message