camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ra...@apache.org
Subject [2/2] camel git commit: CAMEL-9092 MQTT consumer receives duplicate messages after broker restart.
Date Fri, 28 Aug 2015 14:58:06 GMT
CAMEL-9092 MQTT consumer receives duplicate messages after broker restart.

With thanks to Tomohisa Igarashi. Code merged with modifications.

This closes #601.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ac31039c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ac31039c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ac31039c

Branch: refs/heads/camel-2.15.x
Commit: ac31039c9d26e567698bf70aa8e727fed0cbfc42
Parents: 7cdb7c1
Author: Raul Kripalani <raulk@apache.org>
Authored: Fri Aug 28 15:44:35 2015 +0100
Committer: Raul Kripalani <raulk@apache.org>
Committed: Fri Aug 28 15:57:39 2015 +0100

----------------------------------------------------------------------
 .../camel/component/mqtt/MQTTEndpoint.java      | 139 +++++++++++++++-
 .../component/mqtt/MQTTDuplicatesTest.java      | 158 +++++++++++++++++++
 .../src/test/resources/log4j.properties         |   2 +-
 3 files changed, 296 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ac31039c/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
index cfd2eb9..94b3008 100644
--- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
+++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
@@ -39,6 +39,21 @@ import org.fusesource.mqtt.client.Listener;
 import org.fusesource.mqtt.client.Promise;
 import org.fusesource.mqtt.client.QoS;
 import org.fusesource.mqtt.client.Topic;
+import org.fusesource.mqtt.client.Tracer;
+import org.fusesource.mqtt.codec.CONNACK;
+import org.fusesource.mqtt.codec.CONNECT;
+import org.fusesource.mqtt.codec.DISCONNECT;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.PINGREQ;
+import org.fusesource.mqtt.codec.PINGRESP;
+import org.fusesource.mqtt.codec.PUBACK;
+import org.fusesource.mqtt.codec.PUBCOMP;
+import org.fusesource.mqtt.codec.PUBLISH;
+import org.fusesource.mqtt.codec.PUBREC;
+import org.fusesource.mqtt.codec.PUBREL;
+import org.fusesource.mqtt.codec.SUBACK;
+import org.fusesource.mqtt.codec.SUBSCRIBE;
+import org.fusesource.mqtt.codec.UNSUBSCRIBE;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,14 +69,129 @@ public class MQTTEndpoint extends DefaultEndpoint {
     private CallbackConnection connection;
     @UriPath @Metadata(required = "true")
     private String name;
+
     @UriParam
     private final MQTTConfiguration configuration;
     private volatile boolean connected;
     private final List<MQTTConsumer> consumers = new CopyOnWriteArrayList<MQTTConsumer>();
 
-    public MQTTEndpoint(String uri, MQTTComponent component, MQTTConfiguration properties)
{
+    public MQTTEndpoint(final String uri, MQTTComponent component, MQTTConfiguration properties)
{
         super(uri, component);
         this.configuration = properties;
+        if (LOG.isTraceEnabled()) {
+            configuration.setTracer(new Tracer() {
+                @Override
+                public void debug(String message, Object...args) {
+                    LOG.trace("tracer.debug() " + this + ": uri=" + uri + ", message=" +
String.format(message, args));
+                }
+
+                @Override
+                public void onSend(MQTTFrame frame) {
+                    String decoded = null;
+                    try {
+                        switch (frame.messageType()) {
+                        case PINGREQ.TYPE:
+                            decoded = new PINGREQ().decode(frame).toString();
+                            break;
+                        case PINGRESP.TYPE:
+                            decoded = new PINGRESP().decode(frame).toString();
+                            break;
+                        case CONNECT.TYPE:
+                            decoded = new CONNECT().decode(frame).toString();
+                            break;
+                        case DISCONNECT.TYPE:
+                            decoded = new DISCONNECT().decode(frame).toString();
+                            break;
+                        case SUBSCRIBE.TYPE:
+                            decoded = new SUBSCRIBE().decode(frame).toString();
+                            break;
+                        case UNSUBSCRIBE.TYPE:
+                            decoded = new UNSUBSCRIBE().decode(frame).toString();
+                            break;
+                        case PUBLISH.TYPE:
+                            decoded = new PUBLISH().decode(frame).toString();
+                            break;
+                        case PUBACK.TYPE:
+                            decoded = new PUBACK().decode(frame).toString();
+                            break;
+                        case PUBREC.TYPE:
+                            decoded = new PUBREC().decode(frame).toString();
+                            break;
+                        case PUBREL.TYPE:
+                            decoded = new PUBREL().decode(frame).toString();
+                            break;
+                        case PUBCOMP.TYPE:
+                            decoded = new PUBCOMP().decode(frame).toString();
+                            break;
+                        case CONNACK.TYPE:
+                            decoded = new CONNACK().decode(frame).toString();
+                            break;
+                        case SUBACK.TYPE:
+                            decoded = new SUBACK().decode(frame).toString();
+                            break;
+                        default:
+                            decoded = frame.toString();
+                        }
+                    } catch (Throwable e) {
+                        decoded = frame.toString();
+                    }
+                    LOG.trace("tracer.onSend() " + this + ":  uri=" + uri + ", frame=" +
decoded);
+                }
+
+                @Override
+                public void onReceive(MQTTFrame frame) {
+                    String decoded = null;
+                    try {
+                        switch (frame.messageType()) {
+                        case PINGREQ.TYPE:
+                            decoded = new PINGREQ().decode(frame).toString();
+                            break;
+                        case PINGRESP.TYPE:
+                            decoded = new PINGRESP().decode(frame).toString();
+                            break;
+                        case CONNECT.TYPE:
+                            decoded = new CONNECT().decode(frame).toString();
+                            break;
+                        case DISCONNECT.TYPE:
+                            decoded = new DISCONNECT().decode(frame).toString();
+                            break;
+                        case SUBSCRIBE.TYPE:
+                            decoded = new SUBSCRIBE().decode(frame).toString();
+                            break;
+                        case UNSUBSCRIBE.TYPE:
+                            decoded = new UNSUBSCRIBE().decode(frame).toString();
+                            break;
+                        case PUBLISH.TYPE:
+                            decoded = new PUBLISH().decode(frame).toString();
+                            break;
+                        case PUBACK.TYPE:
+                            decoded = new PUBACK().decode(frame).toString();
+                            break;
+                        case PUBREC.TYPE:
+                            decoded = new PUBREC().decode(frame).toString();
+                            break;
+                        case PUBREL.TYPE:
+                            decoded = new PUBREL().decode(frame).toString();
+                            break;
+                        case PUBCOMP.TYPE:
+                            decoded = new PUBCOMP().decode(frame).toString();
+                            break;
+                        case CONNACK.TYPE:
+                            decoded = new CONNACK().decode(frame).toString();
+                            break;
+                        case SUBACK.TYPE:
+                            decoded = new SUBACK().decode(frame).toString();
+                            break;
+                        default:
+                            decoded = frame.toString();
+                        }
+                    } catch (Throwable e) {
+                        decoded = frame.toString();
+                    }
+                    LOG.trace("tracer.onReceive() " + this + ":  uri=" + uri + ", frame="
+ decoded);
+                }
+            });
+        }
     }
 
     @Override
@@ -105,7 +235,11 @@ public class MQTTEndpoint extends DefaultEndpoint {
             }
 
             public void onDisconnected() {
-                connected = false;
+                // no connected = false required here because the MQTT client should trigger
its own reconnect;
+                // setting connected = false would make the publish() method to launch a
new connection while the original
+                // one is still reconnecting, likely leading to duplicate messages as observed
in CAMEL-9092;
+                // if retries are exhausted and it desists, we should get a callback on onFailure,
and then we can set
+                // connected = false safely
                 LOG.debug("MQTT Connection disconnected from {}", configuration.getHost());
             }
 
@@ -177,6 +311,7 @@ public class MQTTEndpoint extends DefaultEndpoint {
                         }
 
                         public void onFailure(Throwable value) {
+                            LOG.debug("Failed to subscribe", value);
                             promise.onFailure(value);
                             connection.disconnect(null);
                             connected = false;

http://git-wip-us.apache.org/repos/asf/camel/blob/ac31039c/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTDuplicatesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTDuplicatesTest.java
b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTDuplicatesTest.java
new file mode 100644
index 0000000..c397a1f
--- /dev/null
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTDuplicatesTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mqtt;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests duplicate delivery via mqtt consumer.
+ * 
+ * @version
+ */
+public class MQTTDuplicatesTest extends MQTTBaseTest {
+
+    private static final int MESSAGE_COUNT = 50;
+    private static final int WAIT_MILLIS = 100;
+
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @Produce(uri = "direct:withClientID")
+    protected ProducerTemplate templateWithClientID;
+
+    @Produce(uri = "direct:withoutClientID")
+    protected ProducerTemplate templateWithoutClientID;
+
+    @Test
+    public void testMqttDuplicates() throws Exception {
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            String body = System.currentTimeMillis() + ": Dummy! " + i;
+            templateWithClientID.asyncSendBody("direct:withClientID", body);
+            Thread.sleep(WAIT_MILLIS);
+        }
+
+        assertNoDuplicates();
+    }
+
+    @Test
+    public void testMqttDuplicatesAfterBrokerRestartWithoutClientID() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+
+        LOG.info(">>>>>>>>>> Restarting broker...");
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setAdvisorySupport(false);
+        brokerService.addConnector("mqtt://127.0.0.1:1883?trace=true");
+        brokerService.start();
+        brokerService.waitUntilStarted();
+        LOG.info(">>>>>>>>>> Broker restarted");
+
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            String body = System.currentTimeMillis() + ": Dummy-restart-without-clientID!
" + i;
+            templateWithoutClientID.asyncSendBody("direct:withoutClientID", body);
+            Thread.sleep(WAIT_MILLIS);
+        }
+
+        assertNoDuplicates();
+    }
+
+    @Test
+    public void testMqttDuplicatesAfterBrokerRestartWithClientID() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+
+        LOG.info(">>>>>>>>>> Restarting broker...");
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setAdvisorySupport(false);
+        brokerService.addConnector("mqtt://127.0.0.1:1883?trace=true");
+        brokerService.start();
+        brokerService.waitUntilStarted();
+        LOG.info(">>>>>>>>>> Broker restarted");
+
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            String body = System.currentTimeMillis() + ": Dummy-restart-with-clientID! "
+ i;
+            templateWithClientID.asyncSendBody("direct:withClientID", body);
+            Thread.sleep(WAIT_MILLIS);
+        }
+
+        assertNoDuplicates();
+    }
+
+    private void assertNoDuplicates() {
+        List<Exchange> exchanges = resultEndpoint.getExchanges();
+        Assert.assertTrue("No message was delivered - something wrong happened", exchanges.size()
> 0);
+        Set<String> values = new HashSet<String>();
+        List<String> duplicates = new ArrayList<String>();
+        for (Exchange e : exchanges) {
+            String body = e.getIn().getBody(String.class);
+            if (values.contains(body)) {
+                duplicates.add(body);
+            }
+            values.add(body);
+        }
+        Assert.assertTrue("Duplicate messages are detected: " + duplicates.toString(), duplicates.isEmpty());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+
+                // --------------------
+                //  Without client ID:
+                // --------------------
+                from("direct:withoutClientID")
+                    .routeId("SenderWithoutClientID")
+                    .log("$$$$$ Sending message: ${body}")
+                    .to("mqtt:sender?publishTopicName=test/topic1&qualityOfService=ExactlyOnce");
+            
+                from("mqtt:reader?subscribeTopicName=test/topic1&qualityOfService=ExactlyOnce")
+                    .routeId("ReceiverWithoutClientID")
+                    .log("$$$$$ Received message: ${body}")
+                    .to("mock:result");
+
+                // --------------------
+                //  With client ID:
+                // --------------------
+                from("direct:withClientID")
+                    .routeId("SenderWithClientID")
+                    .log("$$$$$ Sending message: ${body}")
+                    .to("mqtt:sender?publishTopicName=test/topic2&clientId=sender&qualityOfService=ExactlyOnce");
+                
+                from("mqtt:reader?subscribeTopicName=test/topic2&clientId=receiver&qualityOfService=ExactlyOnce")
+                    .routeId("ReceiverWithClientID")
+                    .log("$$$$$ Received message: ${body}")
+                    .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ac31039c/components/camel-mqtt/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/resources/log4j.properties b/components/camel-mqtt/src/test/resources/log4j.properties
index b15a0cd..d7962ca 100644
--- a/components/camel-mqtt/src/test/resources/log4j.properties
+++ b/components/camel-mqtt/src/test/resources/log4j.properties
@@ -21,7 +21,7 @@
 log4j.rootLogger=info, file
 
 #log4j.logger.twitter4j=DEBUG
-#log4j.logger.org.apache.camel.component.mqtt=DEBUG
+#log4j.logger.org.apache.camel.component.mqtt=TRACE
 #log4j.logger.org.apache.camel=DEBUG
 
 # CONSOLE appender not used by default


Mime
View raw message