camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject [2/2] camel git commit: CAMEL-8400 support multiple topic subscriptions in camel-mqtt with thanks to Mark
Date Thu, 26 Feb 2015 11:31:49 GMT
CAMEL-8400 support multiple topic subscriptions in camel-mqtt with thanks to Mark


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/92ae10a1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/92ae10a1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/92ae10a1

Branch: refs/heads/master
Commit: 92ae10a1276fc8f75735f854d65a9015cb7cda64
Parents: 8fbd40f
Author: Willem Jiang <willem.jiang@gmail.com>
Authored: Thu Feb 26 19:24:25 2015 +0800
Committer: Willem Jiang <willem.jiang@gmail.com>
Committed: Thu Feb 26 19:31:33 2015 +0800

----------------------------------------------------------------------
 .../camel/component/mqtt/MQTTConfiguration.java | 10 ++++
 .../camel/component/mqtt/MQTTEndpoint.java      | 30 ++++++++--
 .../camel/component/mqtt/MQTTBaseTest.java      |  2 +
 .../component/mqtt/MQTTConfigurationTest.java   | 12 ++++
 .../mqtt/MQTTConsumerMultipleTopicsTest.java    | 63 ++++++++++++++++++++
 5 files changed, 111 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/92ae10a1/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java
index 97e906c..b7252a5 100644
--- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java
+++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java
@@ -83,6 +83,8 @@ public class MQTTConfiguration extends MQTT {
      */
     @UriParam
     private String subscribeTopicName = "";
+    @UriParam
+    private String subscribeTopicNames = "";
     @UriParam(defaultValue = "camel/mqtt/test")
     private String publishTopicName = "camel/mqtt/test";
     @UriParam(defaultValue = "10")
@@ -118,6 +120,14 @@ public class MQTTConfiguration extends MQTT {
         this.subscribeTopicName = subscribeTopicName;
     }
 
+    public String getSubscribeTopicNames() {
+        return subscribeTopicNames;
+    }
+
+    public void setSubscribeTopicNames(String subscribeTopicNames) {
+        this.subscribeTopicNames = subscribeTopicNames;
+    }
+
     public String getPublishTopicName() {
         return publishTopicName;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/92ae10a1/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
index 3563f6e..3955240 100644
--- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
+++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
@@ -167,11 +167,8 @@ public class MQTTEndpoint extends DefaultEndpoint {
             public void onSuccess(Void value) {
                 LOG.debug("Connected to {}", configuration.getHost());
 
-                String subscribeTopicName = configuration.getSubscribeTopicName();
-                subscribeTopicName = subscribeTopicName != null ? subscribeTopicName.trim()
: null;
-
-                if (subscribeTopicName != null && !subscribeTopicName.isEmpty())
{
-                    Topic[] topics = {new Topic(subscribeTopicName, configuration.getQoS())};
+                Topic[] topics = createSubscribeTopics();
+                if (topics != null && topics.length > 0) {
                     connection.subscribe(topics, new Callback<byte[]>() {
                         public void onSuccess(byte[] value) {
                             promise.onSuccess(value);
@@ -201,7 +198,28 @@ public class MQTTEndpoint extends DefaultEndpoint {
         LOG.info("Connecting to {} using {} seconds timeout", configuration.getHost(), configuration.getConnectWaitInSeconds());
         promise.await(configuration.getConnectWaitInSeconds(), TimeUnit.SECONDS);
     }
-    
+
+    Topic[] createSubscribeTopics() {
+        String subscribeTopicList = configuration.getSubscribeTopicNames();
+        if (subscribeTopicList != null && !subscribeTopicList.isEmpty()) {
+            String[] topicNames = subscribeTopicList.split(",");
+            Topic[] topics = new Topic[topicNames.length];
+            for (int i = 0; i < topicNames.length; i++) {
+                topics[i] = new Topic(topicNames[i].trim(), configuration.getQoS());
+            }
+            return topics;
+        } else { // fall back on singular topic name
+            String subscribeTopicName = configuration.getSubscribeTopicName();
+            subscribeTopicName = subscribeTopicName != null ? subscribeTopicName.trim() :
null;
+            if (subscribeTopicName != null && !subscribeTopicName.isEmpty()) {
+                Topic[] topics = {new Topic(subscribeTopicName, configuration.getQoS())};
+                return topics;
+            }
+        }
+        LOG.warn("No topic subscriptions were specified in configuration");
+        return null;
+    }
+
     boolean isConnected() {
         return connected;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/92ae10a1/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java
b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java
index f0d9460..50902a1 100644
--- a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java
@@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
 public abstract class MQTTBaseTest extends CamelTestSupport {
     protected static final Logger LOG = LoggerFactory.getLogger(MQTTBaseTest.class);
     protected static final String TEST_TOPIC = "ComponentTestTopic";
+    protected static final String TEST_TOPIC_2 = "AnotherTestTopic";
+    protected static final String TEST_TOPICS = TEST_TOPIC + "," + TEST_TOPIC_2;
     protected BrokerService brokerService;
     protected int numberOfMessages = 100;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/92ae10a1/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
index 6cc0f69..e66e6a1 100644
--- a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
@@ -33,4 +33,16 @@ public class MQTTConfigurationTest extends MQTTBaseTest {
         assertEquals(mqttEndpoint.getConfiguration().getSubscribeTopicName(), TEST_TOPIC);
         assertTrue(mqttEndpoint.getConfiguration().isByDefaultRetain());
     }
+
+    @Test
+    public void testMultipleSubscribeTopicsConfiguration() throws Exception {
+        Endpoint endpoint = context.getEndpoint("mqtt:todo?byDefaultRetain=true&qualityOfService=exactlyOnce&publishTopicName="
+ TEST_TOPIC + "&subscribeTopicNames=" + TEST_TOPICS);
+        assertTrue("Endpoint not a MQTTEndpoint: " + endpoint, endpoint instanceof MQTTEndpoint);
+        MQTTEndpoint mqttEndpoint = (MQTTEndpoint) endpoint;
+
+        assertEquals(mqttEndpoint.getConfiguration().getQoS(), QoS.EXACTLY_ONCE);
+        assertEquals(mqttEndpoint.getConfiguration().getPublishTopicName(), TEST_TOPIC);
+        assertEquals(mqttEndpoint.getConfiguration().getSubscribeTopicNames(), TEST_TOPICS);
+        assertTrue(mqttEndpoint.getConfiguration().isByDefaultRetain());
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/92ae10a1/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConsumerMultipleTopicsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConsumerMultipleTopicsTest.java
b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConsumerMultipleTopicsTest.java
new file mode 100644
index 0000000..eb765eb
--- /dev/null
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConsumerMultipleTopicsTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mqtt;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.junit.Test;
+
+public class MQTTConsumerMultipleTopicsTest extends MQTTBaseTest {
+
+    @Test
+    public void testConsumeMultipleTopics() throws Exception {
+        MQTT mqtt = new MQTT();
+        BlockingConnection publisherConnection = mqtt.blockingConnection();
+        Topic topic1 = new Topic(TEST_TOPIC, QoS.AT_MOST_ONCE);
+        Topic topic2 = new Topic(TEST_TOPIC_2, QoS.AT_MOST_ONCE);
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(numberOfMessages * 2);
+
+        publisherConnection.connect();
+        String payload;
+        for (int i = 0; i < numberOfMessages; i++) {
+            payload = "Topic 1, Message " + i;
+            publisherConnection.publish(topic1.name().toString(), payload.getBytes(), QoS.AT_LEAST_ONCE,
false);
+            payload = "Topic 2, Message " + i;
+            publisherConnection.publish(topic2.name().toString(), payload.getBytes(), QoS.AT_LEAST_ONCE,
false);
+        }
+
+        mock.await(5, TimeUnit.SECONDS);
+        mock.assertIsSatisfied();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+
+        return new RouteBuilder() {
+            public void configure() {
+                from("mqtt:bar?subscribeTopicNames=" + TEST_TOPICS)
+                    .transform(body().convertToString())
+                    .to("mock:result");
+            }
+        };
+    }
+}


Mime
View raw message