activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Pedro Marques (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
Date Tue, 18 Jun 2013 18:00:23 GMT

    [ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13687013#comment-13687013
] 

Pedro Marques edited comment on AMQ-4585 at 6/18/13 5:59 PM:
-------------------------------------------------------------

I was actually trying to replicate the problem on a test but I was having some problems making
the test fail consistently... I attached the code I was trying (testReceivePendingMessages),
even with high values on "numberOfRuns" and "messagesPerRun" the system only fails some times.
I also had problems with the tests never stopping... I don't know if it's because I am using
windows or if it's something on my test code but the tests easily ended up not stopping, especially
when I changed the setCleanSession to true.
                
      was (Author: pmarques):
    I was actually trying to replicate the problem but I was having some problems making the
test fail consistently... I attached the code I was trying (testReceivePendingMessages), even
with high values on "numberOfRuns" and "messagesPerRun" the system only fails some times.
I also had problems with the tests never stopping... I don't know if it's because I am using
windows or if it's something on my test code but the tests easily ended up not stopping, especially
when I changed the setCleanSession to true.
                  
> MQTT BlockingConnection.receive fails when receiving pending messages after reconnect
without cleaning session
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4585
>                 URL: https://issues.apache.org/jira/browse/AMQ-4585
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.8.0
>            Reporter: Pedro Marques
>         Attachments: MQTTTest.java
>
>
> The system throws at least three different types of exceptions when a subscriber receives
the first pending message without cleaning the session. The test case corresponds to receiving
several messages from a publisher then closing the subscriber connection and finally reconnecting
with setCleanSession(false) and attempt to read the messages published while the subscriber
was disconnected.
> The exceptions thrown:
> {code}
> java.net.ProtocolException: Command from server contained an invalid message id: 1
> 	at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723)
> 	at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762)
> 	at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
> 	at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
> 	at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
> 	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
> 	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
> 	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> {code}
> java.lang.ArrayIndexOutOfBoundsException: 0
> 	at org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81)
> 	at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40)
> 	at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749)
> 	at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
> 	at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
> 	at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
> 	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
> 	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
> 	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> {code}
> java.net.ProtocolException: Unexpected MQTT command type: 0
> 	at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775)
> 	at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
> 	at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
> 	at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
> 	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
> 	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
> 	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> No message is shown in the server. The problem doesn't occur always but most of the times
the first reconnection attempt is made. With setCleanSession(true) the system works fine.
> Code sample (publisher, permanently running):
> {code}
> MQTT mqtt = new MQTT();
> mqtt.setHost(url);
> mqtt.setUserName(user);
> mqtt.setPassword(password);
> mqtt.setClientId("test_id");
> int i = 0;
> while (true) {
> 	BlockingConnection connection = mqtt.blockingConnection();
> 	connection.connect();
> 	String message = "TestMessage: " + i;
> 	connection.publish("TopicA", message.getBytes(), QoS.AT_LEAST_ONCE, false);
> 	System.out.println("Vendor: Sent message.");
> 	Thread.sleep(2500);
> 	connection.disconnect();
> 	Thread.sleep(2500);
> 	i++;
> }
> {code}
> Code sample (subscriber, fails multiple times when restarting after the connection is
closed):
> {code}
> BlockingConnection connection = null;
> try {
>     MQTT = new MQTT();
>     mqtt.setHost(url);
>     mqtt.setClientId(clientId);
>     mqtt.setUserName(user);
>     mqtt.setPassword(password);
>     mqtt.setCleanSession(false);
>     connection = mqtt.blockingConnection();
>     connection.connect();
>     Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
>     byte[] qoses = connection.subscribe(topics);
>     int numMessages = 1;
>     while (numMessages % 10 != 0) {
>         Message message = connection.receive();
>         byte[] payload = message.getPayload();
>         String messageContent = new String(payload);
>         System.out.println("Received message from topic: " + message.getTopic() + " Message
content: " + messageContent);
>         message.ack();
>         numMessages++;
>     }
> } finally {
>     if(connection != null) {
>         try {
>             connection.disconnect();
>         } catch (Exception e) {
>             // TODO Auto-generated catch block
>             e.printStackTrace();
>         }
>     }
> }
> {code}
> The test failed when using the current fusesource client (1.5) on ActiveMQ 5.9, on Mosquitto
mqtt the code works correctly

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message