activemq-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (ARTEMIS-1272) Artemis incorrectly handle MQTT acknowledgement
Date Mon, 10 Jul 2017 16:11:02 GMT

    [ https://issues.apache.org/jira/browse/ARTEMIS-1272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080569#comment-16080569
] 

ASF GitHub Bot commented on ARTEMIS-1272:
-----------------------------------------

Github user mtaylor commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1389#discussion_r126467111
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttAcknowledgementTest.java
---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.tests.integration.mqtt;
    +
    +import java.util.LinkedList;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
    +import org.awaitility.Awaitility;
    +import org.awaitility.core.ConditionTimeoutException;
    +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    +import org.eclipse.paho.client.mqttv3.MqttCallback;
    +import org.eclipse.paho.client.mqttv3.MqttClient;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +import org.eclipse.paho.client.mqttv3.MqttMessage;
    +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    +import org.jgroups.util.UUID;
    +import org.junit.After;
    +import org.junit.Test;
    +
    +public class MqttAcknowledgementTest extends MQTTTestSupport {
    +
    +   private volatile LinkedList<Integer> messageIds = new LinkedList<>();
    +   private volatile boolean messageArrived = false;
    +
    +   private MqttClient subscriber;
    +   MqttClient sender;
    +
    +   @After
    +   public void clean() throws MqttException {
    +      messageArrived = false;
    +      messageIds.clear();
    +      if (subscriber.isConnected()) {
    +         subscriber.disconnect();
    +      }
    +      if (sender.isConnected()) {
    +         sender.disconnect();
    +      }
    +      subscriber.close();
    +      sender.close();
    +   }
    +
    +   @Test(timeout = 300000)
    +   public void testAcknowledgementQOS1() throws MqttException {
    +      test(1);
    +   }
    +
    +   @Test(timeout = 300000, expected = ConditionTimeoutException.class)
    +   public void testAcknowledgementQOS0() throws MqttException {
    +      test(0);
    +   }
    +
    +   private void test(int qos) throws MqttException {
    +      String subscriberId = UUID.randomUUID().toString();
    +      String senderId = UUID.randomUUID().toString();
    +      String topic = UUID.randomUUID().toString();
    +
    +      subscriber = createMqttClient(subscriberId);
    +      subscriber.subscribe(topic, qos);
    +
    +      sender = createMqttClient(senderId);
    +      sender.publish(topic, UUID.randomUUID().toString().getBytes(), qos, false);
    +      sender.publish(topic, UUID.randomUUID().toString().getBytes(), qos, false);
    +
    +      Awaitility.await().atMost(5_000, TimeUnit.MILLISECONDS).until(() -> messageIds.size()
== 2);
    +
    +      subscriber.messageArrivedComplete(messageIds.getLast(), qos);
    +      subscriber.disconnect();
    +      subscriber.close();
    +      messageArrived = false;
    +
    +      Awaitility.await().atMost(60_000, TimeUnit.MILLISECONDS).until(() -> {
    +         try {
    +            subscriber = createMqttClient(subscriberId);
    +            return true;
    +         } catch (MqttException e) {
    +            return false;
    +         }
    +      });
    --- End diff --
    
    You can reuse the Wait.waitFor(Condition) method to do the same thing:
    
    ```java
          Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisfied() throws Exception {
                try {
                   subscriber = createMqttClient(subscriberId);
                   return true;
                } catch (MqttException e) {
                   return false;
                }
             }
          }, 60000);
    ```


> Artemis incorrectly handle MQTT acknowledgement
> -----------------------------------------------
>
>                 Key: ARTEMIS-1272
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-1272
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: MQTT
>    Affects Versions: 1.5.3, 2.1.0
>            Reporter: Odyldzhon Toshbekov
>
> When MQTT client send acknowledgement Artemis acknowledge previous sent messages.
> Test case:
> 1. Connect to Broker (client 1)
> 2. Connect to Broker (client 2)
> 2. Send two messages with QOS = 1
> 3. Send acknowledgement for second message
> A.R. Artemis will remove from queue first and second message
> E.R. Artemis should remove only second message
> 4. Send acknowledgement for first message
> A.R. WARN message "attempted to Ack already Ack'd message"
> The cause of the problem located in the class MQTTPublisherManager methods "handlePubRec",
"handlePubComp", "handlePubAck" and "sendMessage"
> Fix: replace "session.getServerSession().acknowledge" to session.getServerSession().individualAcknowledge




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message