activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Davy De Waele (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
Date Tue, 14 Oct 2014 17:05:34 GMT

    [ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14171220#comment-14171220
] 

Davy De Waele commented on AMQ-4585:
------------------------------------

I think there is still an issue with the FuseSource client (Even the latest version). It's
easy to reproduce the timing issue. See the Github issue for more details. The issue in the
latest FuseSource client is that a PUBACK can be processed before the PUB was completed.

When the client sends a PUB over the wire it can happen that it immediately starts processing
the PUBACK before it finished its internal bookkeeping on the PUB. (FuseSource thinks the
PUBACK is an invalid message id because it hasn't fully registered the PUB yet).

> MQTT BlockingConnection.receive fails when receiving pending messages after reconnect
without cleaning session
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4585
>                 URL: https://issues.apache.org/jira/browse/AMQ-4585
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.8.0
>            Reporter: Pedro Marques
>             Fix For: 5.10.0
>
>         Attachments: MQTTTest.java
>
>
> The system throws at least three different types of exceptions when a subscriber receives
the first pending message without cleaning the session. The test case corresponds to receiving
several messages from a publisher then closing the subscriber connection and finally reconnecting
with setCleanSession(false) and attempt to read the messages published while the subscriber
was disconnected.
> The exceptions thrown:
> {code}
> java.net.ProtocolException: Command from server contained an invalid message id: 1
> 	at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723)
> 	at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762)
> 	at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
> 	at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
> 	at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
> 	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
> 	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
> 	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> {code}
> java.lang.ArrayIndexOutOfBoundsException: 0
> 	at org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81)
> 	at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40)
> 	at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749)
> 	at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
> 	at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
> 	at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
> 	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
> 	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
> 	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> {code}
> java.net.ProtocolException: Unexpected MQTT command type: 0
> 	at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775)
> 	at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
> 	at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
> 	at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
> 	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
> 	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
> 	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> No message is shown in the server. The problem doesn't occur always but most of the times
the first reconnection attempt is made. With setCleanSession(true) the system works fine.
> Code sample (publisher, permanently running):
> {code}
> MQTT mqtt = new MQTT();
> mqtt.setHost(url);
> mqtt.setUserName(user);
> mqtt.setPassword(password);
> mqtt.setClientId("test_id");
> int i = 0;
> while (true) {
> 	BlockingConnection connection = mqtt.blockingConnection();
> 	connection.connect();
> 	String message = "TestMessage: " + i;
> 	connection.publish("TopicA", message.getBytes(), QoS.AT_LEAST_ONCE, false);
> 	System.out.println("Vendor: Sent message.");
> 	Thread.sleep(2500);
> 	connection.disconnect();
> 	Thread.sleep(2500);
> 	i++;
> }
> {code}
> Code sample (subscriber, fails multiple times when restarting after the connection is
closed):
> {code}
> BlockingConnection connection = null;
> try {
>     MQTT = new MQTT();
>     mqtt.setHost(url);
>     mqtt.setClientId(clientId);
>     mqtt.setUserName(user);
>     mqtt.setPassword(password);
>     mqtt.setCleanSession(false);
>     connection = mqtt.blockingConnection();
>     connection.connect();
>     Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
>     byte[] qoses = connection.subscribe(topics);
>     int numMessages = 1;
>     while (numMessages % 10 != 0) {
>         Message message = connection.receive();
>         byte[] payload = message.getPayload();
>         String messageContent = new String(payload);
>         System.out.println("Received message from topic: " + message.getTopic() + " Message
content: " + messageContent);
>         message.ack();
>         numMessages++;
>     }
> } finally {
>     if(connection != null) {
>         try {
>             connection.disconnect();
>         } catch (Exception e) {
>             // TODO Auto-generated catch block
>             e.printStackTrace();
>         }
>     }
> }
> {code}
> The test failed when using the current fusesource client (1.5) on ActiveMQ 5.9, on Mosquitto
mqtt the code works correctly



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message