Return-Path: X-Original-To: apmail-activemq-dev-archive@www.apache.org Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7CDBD17C73 for ; Tue, 14 Oct 2014 17:05:35 +0000 (UTC) Received: (qmail 34191 invoked by uid 500); 14 Oct 2014 17:05:34 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 34106 invoked by uid 500); 14 Oct 2014 17:05:34 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 33929 invoked by uid 99); 14 Oct 2014 17:05:34 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Oct 2014 17:05:34 +0000 Date: Tue, 14 Oct 2014 17:05:34 +0000 (UTC) From: "Davy De Waele (JIRA)" To: dev@activemq.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14171220#comment-14171220 ] Davy De Waele commented on AMQ-4585: ------------------------------------ I think there is still an issue with the FuseSource client (Even the latest version). It's easy to reproduce the timing issue. See the Github issue for more details. The issue in the latest FuseSource client is that a PUBACK can be processed before the PUB was completed. When the client sends a PUB over the wire it can happen that it immediately starts processing the PUBACK before it finished its internal bookkeeping on the PUB. (FuseSource thinks the PUBACK is an invalid message id because it hasn't fully registered the PUB yet). > MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session > -------------------------------------------------------------------------------------------------------------- > > Key: AMQ-4585 > URL: https://issues.apache.org/jira/browse/AMQ-4585 > Project: ActiveMQ > Issue Type: Bug > Affects Versions: 5.8.0 > Reporter: Pedro Marques > Fix For: 5.10.0 > > Attachments: MQTTTest.java > > > The system throws at least three different types of exceptions when a subscriber receives the first pending message without cleaning the session. The test case corresponds to receiving several messages from a publisher then closing the subscriber connection and finally reconnecting with setCleanSession(false) and attempt to read the messages published while the subscriber was disconnected. > The exceptions thrown: > {code} > java.net.ProtocolException: Command from server contained an invalid message id: 1 > at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) > at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) > at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.lang.ArrayIndexOutOfBoundsException: 0 > at org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) > at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) > at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) > at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.net.ProtocolException: Unexpected MQTT command type: 0 > at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) > at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > No message is shown in the server. The problem doesn't occur always but most of the times the first reconnection attempt is made. With setCleanSession(true) the system works fine. > Code sample (publisher, permanently running): > {code} > MQTT mqtt = new MQTT(); > mqtt.setHost(url); > mqtt.setUserName(user); > mqtt.setPassword(password); > mqtt.setClientId("test_id"); > int i = 0; > while (true) { > BlockingConnection connection = mqtt.blockingConnection(); > connection.connect(); > String message = "TestMessage: " + i; > connection.publish("TopicA", message.getBytes(), QoS.AT_LEAST_ONCE, false); > System.out.println("Vendor: Sent message."); > Thread.sleep(2500); > connection.disconnect(); > Thread.sleep(2500); > i++; > } > {code} > Code sample (subscriber, fails multiple times when restarting after the connection is closed): > {code} > BlockingConnection connection = null; > try { > MQTT = new MQTT(); > mqtt.setHost(url); > mqtt.setClientId(clientId); > mqtt.setUserName(user); > mqtt.setPassword(password); > mqtt.setCleanSession(false); > connection = mqtt.blockingConnection(); > connection.connect(); > Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)}; > byte[] qoses = connection.subscribe(topics); > int numMessages = 1; > while (numMessages % 10 != 0) { > Message message = connection.receive(); > byte[] payload = message.getPayload(); > String messageContent = new String(payload); > System.out.println("Received message from topic: " + message.getTopic() + " Message content: " + messageContent); > message.ack(); > numMessages++; > } > } finally { > if(connection != null) { > try { > connection.disconnect(); > } catch (Exception e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > } > } > {code} > The test failed when using the current fusesource client (1.5) on ActiveMQ 5.9, on Mosquitto mqtt the code works correctly -- This message was sent by Atlassian JIRA (v6.3.4#6332)