storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [04/14] storm git commit: STORM-2416 Release Packaging Improvements
Date Wed, 05 Apr 2017 22:34:00 GMT
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java
new file mode 100644
index 0000000..2b09d6e
--- /dev/null
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.common;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * MQTT Configuration Options
+ */
+public class MqttOptions implements Serializable {
+    private String url = "tcp://localhost:1883";
+    private List<String> topics = null;
+    private boolean cleanConnection = false;
+
+    private String willTopic;
+    private String willPayload;
+    private int willQos = 1;
+    private boolean willRetain = false;
+
+    private long reconnectDelay = 10;
+    private long reconnectDelayMax = 30*1000;
+    private double reconnectBackOffMultiplier = 2.0f;
+    private long reconnectAttemptsMax = -1;
+    private long connectAttemptsMax = -1;
+
+    private String userName = "";
+    private String password = "";
+
+    private int qos = 1;
+
+    public String getUrl() {
+        return url;
+    }
+
+    /**
+     * Sets the url for connecting to the MQTT broker.
+     *
+     * Default: `tcp://localhost:1883'
+     * @param url
+     */
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public List<String> getTopics() {
+        return topics;
+    }
+
+    /**
+     * A list of MQTT topics to subscribe to.
+     *
+     * @param topics
+     */
+    public void setTopics(List<String> topics) {
+        this.topics = topics;
+    }
+
+    public boolean isCleanConnection() {
+        return cleanConnection;
+    }
+
+    /**
+     * Set to false if you want the MQTT server to persist topic subscriptions and ack positions across client sessions.
+     * Defaults to false.
+     *
+     * @param cleanConnection
+     */
+    public void setCleanConnection(boolean cleanConnection) {
+        this.cleanConnection = cleanConnection;
+    }
+
+    public String getWillTopic() {
+        return willTopic;
+    }
+
+    /**
+     * If set the server will publish the client's Will message to the specified topics if the client has an unexpected
+     * disconnection.
+     *
+     * @param willTopic
+     */
+    public void setWillTopic(String willTopic) {
+        this.willTopic = willTopic;
+    }
+
+    public String getWillPayload() {
+        return willPayload;
+    }
+
+    /**
+     * The Will message to send. Defaults to a zero length message.
+     *
+     * @param willPayload
+     */
+    public void setWillPayload(String willPayload) {
+        this.willPayload = willPayload;
+    }
+
+    public long getReconnectDelay() {
+        return reconnectDelay;
+    }
+
+    /**
+     * How long to wait in ms before the first reconnect attempt. Defaults to 10.
+     *
+     * @param reconnectDelay
+     */
+    public void setReconnectDelay(long reconnectDelay) {
+        this.reconnectDelay = reconnectDelay;
+    }
+
+    public long getReconnectDelayMax() {
+        return reconnectDelayMax;
+    }
+
+    /**
+     * The maximum amount of time in ms to wait between reconnect attempts. Defaults to 30,000.
+     *
+     * @param reconnectDelayMax
+     */
+    public void setReconnectDelayMax(long reconnectDelayMax) {
+        this.reconnectDelayMax = reconnectDelayMax;
+    }
+
+    public double getReconnectBackOffMultiplier() {
+        return reconnectBackOffMultiplier;
+    }
+
+    /**
+     * The Exponential backoff be used between reconnect attempts. Set to 1 to disable exponential backoff. Defaults to
+     * 2.
+     *
+     * @param reconnectBackOffMultiplier
+     */
+    public void setReconnectBackOffMultiplier(double reconnectBackOffMultiplier) {
+        this.reconnectBackOffMultiplier = reconnectBackOffMultiplier;
+    }
+
+    public long getReconnectAttemptsMax() {
+        return reconnectAttemptsMax;
+    }
+
+    /**
+     * The maximum number of reconnect attempts before an error is reported back to the client after a server
+     * connection had previously been established. Set to -1 to use unlimited attempts. Defaults to -1.
+     *
+     * @param reconnectAttemptsMax
+     */
+    public void setReconnectAttemptsMax(long reconnectAttemptsMax) {
+        this.reconnectAttemptsMax = reconnectAttemptsMax;
+    }
+
+    public long getConnectAttemptsMax() {
+        return connectAttemptsMax;
+    }
+
+    /**
+     * The maximum number of reconnect attempts before an error is reported back to the client on the first attempt by
+     * the client to connect to a server. Set to -1 to use unlimited attempts. Defaults to -1.
+     *
+     * @param connectAttemptsMax
+     */
+    public void setConnectAttemptsMax(long connectAttemptsMax) {
+        this.connectAttemptsMax = connectAttemptsMax;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    /**
+     * The username for authenticated sessions.
+     *
+     * @param userName
+     */
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    /**
+     * The password for authenticated sessions.
+     * @param password
+     */
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public int getQos(){
+        return this.qos;
+    }
+
+    /**
+     * Sets the quality of service to use for MQTT messages. Defaults to 1 (at least once).
+     * @param qos
+     */
+    public void setQos(int qos){
+        if(qos < 0 || qos > 2){
+            throw new IllegalArgumentException("MQTT QoS must be >= 0 and <= 2");
+        }
+        this.qos = qos;
+    }
+
+    public int getWillQos(){
+        return this.willQos;
+    }
+
+    /**
+     * Sets the quality of service to use for the MQTT Will message. Defaults to 1 (at least once).
+     *
+     * @param qos
+     */
+    public void setWillQos(int qos){
+        if(qos < 0 || qos > 2){
+            throw new IllegalArgumentException("MQTT Will QoS must be >= 0 and <= 2");
+        }
+        this.willQos = qos;
+    }
+
+    public boolean getWillRetain(){
+        return this.willRetain;
+    }
+
+    /**
+     * Set to true if you want the Will message to be published with the retain option.
+     * @param retain
+     */
+    public void setWillRetain(boolean retain){
+        this.willRetain = retain;
+    }
+
+    public static class Builder {
+        private MqttOptions options = new MqttOptions();
+
+        public Builder url(String url) {
+            this.options.url = url;
+            return this;
+        }
+
+
+        public Builder topics(List<String> topics) {
+            this.options.topics = topics;
+            return this;
+        }
+
+        public Builder cleanConnection(boolean cleanConnection) {
+            this.options.cleanConnection = cleanConnection;
+            return this;
+        }
+
+        public Builder willTopic(String willTopic) {
+            this.options.willTopic = willTopic;
+            return this;
+        }
+
+        public Builder willPayload(String willPayload) {
+            this.options.willPayload = willPayload;
+            return this;
+        }
+
+        public Builder willRetain(boolean retain){
+            this.options.willRetain = retain;
+            return this;
+        }
+
+        public Builder willQos(int qos){
+            this.options.setWillQos(qos);
+            return this;
+        }
+
+        public Builder reconnectDelay(long reconnectDelay) {
+            this.options.reconnectDelay = reconnectDelay;
+            return this;
+        }
+
+        public Builder reconnectDelayMax(long reconnectDelayMax) {
+            this.options.reconnectDelayMax = reconnectDelayMax;
+            return this;
+        }
+
+        public Builder reconnectBackOffMultiplier(double reconnectBackOffMultiplier) {
+            this.options.reconnectBackOffMultiplier = reconnectBackOffMultiplier;
+            return this;
+        }
+
+        public Builder reconnectAttemptsMax(long reconnectAttemptsMax) {
+            this.options.reconnectAttemptsMax = reconnectAttemptsMax;
+            return this;
+        }
+
+        public Builder connectAttemptsMax(long connectAttemptsMax) {
+            this.options.connectAttemptsMax = connectAttemptsMax;
+            return this;
+        }
+
+        public Builder userName(String userName) {
+            this.options.userName = userName;
+            return this;
+        }
+
+        public Builder password(String password) {
+            this.options.password = password;
+            return this;
+        }
+
+        public Builder qos(int qos){
+            this.options.setQos(qos);
+            return this;
+        }
+
+        public MqttOptions build() {
+            return this.options;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
new file mode 100644
index 0000000..9b36b78
--- /dev/null
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.common;
+
+
+import org.apache.storm.mqtt.MqttLogger;
+import org.apache.storm.mqtt.MqttMessage;
+import org.apache.storm.mqtt.ssl.KeyStoreLoader;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+public class MqttPublisher {
+    private static final Logger LOG = LoggerFactory.getLogger(MqttPublisher.class);
+
+    private MqttOptions options;
+    private transient BlockingConnection connection;
+    private KeyStoreLoader keyStoreLoader;
+    private QoS qos;
+    private boolean retain = false;
+
+
+    public MqttPublisher(MqttOptions options){
+        this(options, null, false);
+    }
+
+    public MqttPublisher(MqttOptions options, boolean retain){
+        this(options, null, retain);
+    }
+
+    public MqttPublisher(MqttOptions options, KeyStoreLoader keyStoreLoader, boolean retain){
+        this.retain = retain;
+        this.options = options;
+        this.keyStoreLoader = keyStoreLoader;
+        SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader);
+        this.qos = MqttUtils.qosFromInt(this.options.getQos());
+    }
+
+    public void publish(MqttMessage message) throws Exception {
+        this.connection.publish(message.getTopic(), message.getMessage(), this.qos, this.retain);
+    }
+
+    public void connectMqtt(String clientId) throws Exception {
+        MQTT client = MqttUtils.configureClient(this.options, clientId, this.keyStoreLoader);
+        this.connection = client.blockingConnection();
+        this.connection.connect();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java
new file mode 100644
index 0000000..4ca0145
--- /dev/null
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.common;
+
+
+import org.apache.storm.mqtt.MqttLogger;
+import org.apache.storm.mqtt.ssl.KeyStoreLoader;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+public class MqttUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(MqttUtils.class);
+
+    private MqttUtils(){}
+
+    public static QoS qosFromInt(int i){
+        QoS qos = null;
+        switch(i) {
+            case 0:
+                qos = QoS.AT_MOST_ONCE;
+                break;
+            case 1:
+                qos = QoS.AT_LEAST_ONCE;
+                break;
+            case 2:
+                qos = QoS.EXACTLY_ONCE;
+                break;
+            default:
+                throw new IllegalArgumentException(i + "is not a valid MQTT QoS.");
+        }
+        return qos;
+    }
+
+
+    public static MQTT configureClient(MqttOptions options, String clientId, KeyStoreLoader keyStoreLoader)
+            throws Exception{
+
+        MQTT client = new MQTT();
+        URI uri = URI.create(options.getUrl());
+
+        client.setHost(uri);
+        if(!uri.getScheme().toLowerCase().equals("tcp")){
+            client.setSslContext(SslUtils.sslContext(uri.getScheme(), keyStoreLoader));
+        }
+        client.setClientId(clientId);
+        LOG.info("MQTT ClientID: {}", client.getClientId().toString());
+        client.setCleanSession(options.isCleanConnection());
+
+        client.setReconnectDelay(options.getReconnectDelay());
+        client.setReconnectDelayMax(options.getReconnectDelayMax());
+        client.setReconnectBackOffMultiplier(options.getReconnectBackOffMultiplier());
+        client.setConnectAttemptsMax(options.getConnectAttemptsMax());
+        client.setReconnectAttemptsMax(options.getReconnectAttemptsMax());
+
+
+        client.setUserName(options.getUserName());
+        client.setPassword(options.getPassword());
+        client.setTracer(new MqttLogger());
+
+        if(options.getWillTopic() != null && options.getWillPayload() != null){
+            QoS qos = MqttUtils.qosFromInt(options.getWillQos());
+            client.setWillQos(qos);
+            client.setWillTopic(options.getWillTopic());
+            client.setWillMessage(options.getWillPayload());
+            client.setWillRetain(options.getWillRetain());
+        }
+        return client;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java
new file mode 100644
index 0000000..681fc1d
--- /dev/null
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.common;
+
+
+import org.apache.storm.mqtt.ssl.KeyStoreLoader;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import java.net.URI;
+import java.security.KeyStore;
+
+public class SslUtils {
+    private SslUtils(){}
+
+    public static void checkSslConfig(String url, KeyStoreLoader loader){
+        URI uri = URI.create(url);
+        String scheme = uri.getScheme().toLowerCase();
+        if(!(scheme.equals("tcp") || scheme.startsWith("tls") || scheme.startsWith("ssl"))){
+            throw new IllegalArgumentException("Unrecognized URI scheme: " + scheme);
+        }
+        if(!scheme.equalsIgnoreCase("tcp") && loader == null){
+            throw new IllegalStateException("A TLS/SSL MQTT URL was specified, but no KeyStoreLoader configured. " +
+                    "A KeyStoreLoader implementation is required when using TLS/SSL.");
+        }
+    }
+
+    public static SSLContext sslContext(String scheme, KeyStoreLoader keyStoreLoader) throws Exception {
+        KeyStore ks = KeyStore.getInstance("JKS");
+        ks.load(keyStoreLoader.keyStoreInputStream(), keyStoreLoader.keyStorePassword().toCharArray());
+
+        KeyStore ts = KeyStore.getInstance("JKS");
+        ts.load(keyStoreLoader.trustStoreInputStream(), keyStoreLoader.trustStorePassword().toCharArray());
+
+        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        kmf.init(ks, keyStoreLoader.keyPassword().toCharArray());
+
+        TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+        tmf.init(ts);
+
+        SSLContext sc = SSLContext.getInstance(scheme.toUpperCase());
+        TrustManager[] trustManagers = tmf.getTrustManagers();
+        sc.init(kmf.getKeyManagers(), trustManagers, null);
+
+        return sc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
new file mode 100644
index 0000000..a19fce4
--- /dev/null
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.mappers;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.mqtt.MqttMessage;
+import org.apache.storm.mqtt.MqttMessageMapper;
+
+
+public class ByteArrayMessageMapper implements MqttMessageMapper {
+    public Values toValues(MqttMessage message) {
+        return new Values(message.getTopic(), message.getMessage());
+    }
+
+    public Fields outputFields() {
+        return new Fields("topic", "message");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
new file mode 100644
index 0000000..e5f309b
--- /dev/null
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.mappers;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.mqtt.MqttMessage;
+import org.apache.storm.mqtt.MqttMessageMapper;
+
+/**
+ * Given a String topic and byte[] message, emits a tuple with fields
+ * "topic" and "message", both of which are Strings.
+ */
+public class StringMessageMapper implements MqttMessageMapper {
+    public Values toValues(MqttMessage message) {
+        return new Values(message.getTopic(), new String(message.getMessage()));
+    }
+
+    public Fields outputFields() {
+        return new Fields("topic", "message");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java
new file mode 100644
index 0000000..08348c9
--- /dev/null
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.spout;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.storm.mqtt.MqttMessage;
+
+/**
+ * Represents an MQTT Message consisting of a topic string (e.g. "/users/ptgoetz/office/thermostat")
+ * and a byte array message/payload.
+ *
+ */
+class AckableMessage {
+    private String topic;
+    private byte[] message;
+    private Runnable ack;
+
+    AckableMessage(String topic, byte[] message, Runnable ack){
+        this.topic = topic;
+        this.message = message;
+        this.ack = ack;
+    }
+
+    public MqttMessage getMessage(){
+        return new MqttMessage(this.topic, this.message);
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder(71, 123)
+                .append(this.topic)
+                .append(this.message)
+                .toHashCode();
+    }
+
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null) { return false; }
+        if (obj == this) { return true; }
+        if (obj.getClass() != getClass()) {
+            return false;
+        }
+        AckableMessage tm = (AckableMessage)obj;
+        return new EqualsBuilder()
+                .appendSuper(super.equals(obj))
+                .append(this.topic, tm.topic)
+                .append(this.message, tm.message)
+                .isEquals();
+    }
+
+    Runnable ack(){
+        return this.ack;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
new file mode 100644
index 0000000..7f10cc5
--- /dev/null
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.spout;
+
+import org.apache.storm.Config;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.mqtt.MqttMessageMapper;
+import org.apache.storm.mqtt.common.MqttOptions;
+import org.apache.storm.mqtt.common.MqttUtils;
+import org.apache.storm.mqtt.common.SslUtils;
+import org.apache.storm.mqtt.ssl.KeyStoreLoader;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.mqtt.client.Callback;
+import org.fusesource.mqtt.client.CallbackConnection;
+import org.fusesource.mqtt.client.Listener;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class MqttSpout implements IRichSpout, Listener {
+    private static final Logger LOG = LoggerFactory.getLogger(MqttSpout.class);
+
+    private String topologyName;
+
+
+    private CallbackConnection connection;
+
+    protected transient SpoutOutputCollector collector;
+    protected transient TopologyContext context;
+    protected transient LinkedBlockingQueue<AckableMessage> incoming;
+    protected transient HashMap<Long, AckableMessage> pending;
+    private transient Map conf;
+    protected MqttMessageMapper type;
+    protected MqttOptions options;
+    protected KeyStoreLoader keyStoreLoader;
+
+    private boolean mqttConnected = false;
+    private boolean mqttConnectFailed = false;
+
+
+    private Long sequence = Long.MIN_VALUE;
+
+    private Long nextId(){
+        this.sequence++;
+        if(this.sequence == Long.MAX_VALUE){
+            this.sequence = Long.MIN_VALUE;
+        }
+        return this.sequence;
+    }
+
+    protected MqttSpout(){}
+
+    public MqttSpout(MqttMessageMapper type, MqttOptions options){
+        this(type, options, null);
+    }
+
+    public MqttSpout(MqttMessageMapper type, MqttOptions options, KeyStoreLoader keyStoreLoader){
+        this.type = type;
+        this.options = options;
+        this.keyStoreLoader = keyStoreLoader;
+        SslUtils.checkSslConfig(this.options.getUrl(), this.keyStoreLoader);
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(this.type.outputFields());
+    }
+
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME);
+
+        this.collector = collector;
+        this.context = context;
+        this.conf = conf;
+
+        this.incoming = new LinkedBlockingQueue<>();
+        this.pending = new HashMap<>();
+
+        try {
+            connectMqtt();
+        } catch (Exception e) {
+            this.collector.reportError(e);
+            throw new RuntimeException("MQTT Connection failed.", e);
+        }
+
+    }
+
+    private void connectMqtt() throws Exception {
+        String clientId = this.topologyName + "-" + this.context.getThisComponentId() + "-" +
+                this.context.getThisTaskId();
+
+        MQTT client = MqttUtils.configureClient(this.options, clientId, this.keyStoreLoader);
+        this.connection = client.callbackConnection();
+        this.connection.listener(this);
+        this.connection.connect(new ConnectCallback());
+
+        while(!this.mqttConnected && !this.mqttConnectFailed){
+            LOG.info("Waiting for connection...");
+            Thread.sleep(500);
+        }
+
+        if(this.mqttConnected){
+            List<String> topicList = this.options.getTopics();
+            Topic[] topics = new Topic[topicList.size()];
+            QoS qos = MqttUtils.qosFromInt(this.options.getQos());
+            for(int i = 0;i < topicList.size();i++){
+                topics[i] = new Topic(topicList.get(i), qos);
+            }
+            connection.subscribe(topics, new SubscribeCallback());
+        }
+    }
+
+
+
+    public void close() {
+        this.connection.disconnect(new DisconnectCallback());
+    }
+
+    public void activate() {
+    }
+
+    public void deactivate() {
+    }
+
+    /**
+     * When this method is called, Storm is requesting that the Spout emit tuples to the
+     * output collector. This method should be non-blocking, so if the Spout has no tuples
+     * to emit, this method should return. nextTuple, ack, and fail are all called in a tight
+     * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous
+     * to have nextTuple sleep for a short amount of time (like a single millisecond)
+     * so as not to waste too much CPU.
+     */
+    public void nextTuple() {
+        AckableMessage tm = this.incoming.poll();
+        if(tm != null){
+            Long id = nextId();
+            this.collector.emit(this.type.toValues(tm.getMessage()), id);
+            this.pending.put(id, tm);
+        } else {
+            Thread.yield();
+        }
+
+    }
+
+    /**
+     * Storm has determined that the tuple emitted by this spout with the msgId identifier
+     * has been fully processed. Typically, an implementation of this method will take that
+     * message off the queue and prevent it from being replayed.
+     *
+     * @param msgId
+     */
+    public void ack(Object msgId) {
+        AckableMessage msg = this.pending.remove(msgId);
+        this.connection.getDispatchQueue().execute(msg.ack());
+    }
+
+    /**
+     * The tuple emitted by this spout with the msgId identifier has failed to be
+     * fully processed. Typically, an implementation of this method will put that
+     * message back on the queue to be replayed at a later time.
+     *
+     * @param msgId
+     */
+    public void fail(Object msgId) {
+        try {
+            this.incoming.put(this.pending.remove(msgId));
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted while re-queueing message.", e);
+        }
+    }
+
+
+    // ################# Listener Implementation ######################
+    public void onConnected() {
+        // this gets called repeatedly for no apparent reason, don't do anything
+    }
+
+    public void onDisconnected() {
+        // this gets called repeatedly for no apparent reason, don't do anything
+    }
+
+    public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {
+        LOG.debug("Received message: topic={}, payload={}", topic.toString(), new String(payload.toByteArray()));
+        try {
+            this.incoming.put(new AckableMessage(topic.toString(), payload.toByteArray(), ack));
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted while queueing an MQTT message.");
+        }
+    }
+
+    public void onFailure(Throwable throwable) {
+        LOG.error("MQTT Connection Failure.", throwable);
+        MqttSpout.this.connection.disconnect(new DisconnectCallback());
+        throw new RuntimeException("MQTT Connection failure.", throwable);
+    }
+
+    // ################# Connect Callback Implementation ######################
+    private class ConnectCallback implements Callback<Void> {
+        public void onSuccess(Void v) {
+            LOG.info("MQTT Connected. Subscribing to topic...");
+            MqttSpout.this.mqttConnected = true;
+        }
+
+        public void onFailure(Throwable throwable) {
+            LOG.info("MQTT Connection failed.");
+            MqttSpout.this.mqttConnectFailed = true;
+        }
+    }
+
+    // ################# Subscribe Callback Implementation ######################
+    private class SubscribeCallback implements Callback<byte[]>{
+        public void onSuccess(byte[] qos) {
+            LOG.info("Subscripton sucessful.");
+        }
+
+        public void onFailure(Throwable throwable) {
+            LOG.error("MQTT Subscripton failed.", throwable);
+            throw new RuntimeException("MQTT Subscribe failed.", throwable);
+        }
+    }
+
+    // ################# Subscribe Callback Implementation ######################
+    private class DisconnectCallback implements Callback<Void>{
+        public void onSuccess(Void aVoid) {
+            LOG.info("MQTT Disconnect successful.");
+        }
+
+        public void onFailure(Throwable throwable) {
+            // Disconnects don't fail.
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java
new file mode 100644
index 0000000..8bca407
--- /dev/null
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.ssl;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+/**
+ * KeyStoreLoader implementation that uses local files.
+ */
+public class DefaultKeyStoreLoader implements KeyStoreLoader {
+    private String ksFile = null;
+    private String tsFile = null;
+    private String keyStorePassword = "";
+    private String trustStorePassword = "";
+    private String keyPassword = "";
+
+    /**
+     * Creates a DefaultKeystoreLoader that uses the same file
+     * for both the keystore and truststore.
+     *
+     * @param keystore path to keystore file
+     */
+    public DefaultKeyStoreLoader(String keystore){
+        this.ksFile = keystore;
+    }
+
+    /**
+     * Creates a DefaultKeystoreLoader that uses separate files
+     * for the keystore and truststore.
+     *
+     * @param keystore path to keystore file
+     * @param truststore path to truststore file
+     */
+    public DefaultKeyStoreLoader(String keystore, String truststore){
+        this.ksFile = keystore;
+        this.tsFile = truststore;
+    }
+
+    public void setKeyStorePassword(String keyStorePassword) {
+        this.keyStorePassword = keyStorePassword;
+    }
+
+    public void setTrustStorePassword(String trustStorePassword) {
+        this.trustStorePassword = trustStorePassword;
+    }
+
+    public void setKeyPassword(String keyPassword) {
+        this.keyPassword = keyPassword;
+    }
+
+    @Override
+    public InputStream keyStoreInputStream() throws FileNotFoundException {
+        return new FileInputStream(this.ksFile);
+    }
+
+    @Override
+    public InputStream trustStoreInputStream() throws FileNotFoundException {
+        // if no truststore file, assume the truststore is the keystore.
+        if(this.tsFile == null){
+            return new FileInputStream(this.ksFile);
+        } else {
+            return new FileInputStream(this.tsFile);
+        }
+    }
+
+    @Override
+    public String keyStorePassword() {
+        return this.keyStorePassword;
+    }
+
+    @Override
+    public String trustStorePassword() {
+        return this.trustStorePassword;
+    }
+
+    @Override
+    public String keyPassword() {
+        return this.keyPassword;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java
new file mode 100644
index 0000000..297efcc
--- /dev/null
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.ssl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+
+/**
+ * Abstraction for loading keystore/truststore data. This allows keystores
+ * to be loaded from different sources (File system, HDFS, etc.).
+ */
+public interface KeyStoreLoader extends Serializable {
+
+    String keyStorePassword();
+    String trustStorePassword();
+    String keyPassword();
+    InputStream keyStoreInputStream() throws IOException;
+    InputStream trustStoreInputStream() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java
new file mode 100644
index 0000000..e53c983
--- /dev/null
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.mqtt.MqttMessage;
+import org.apache.storm.mqtt.common.MqttOptions;
+import org.apache.storm.mqtt.MqttTupleMapper;
+import org.apache.storm.mqtt.common.MqttPublisher;
+import org.apache.storm.mqtt.common.SslUtils;
+import org.apache.storm.mqtt.ssl.KeyStoreLoader;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.util.Map;
+
+public class MqttPublishFunction extends BaseFunction {
+    private static final Logger LOG = LoggerFactory.getLogger(MqttPublishFunction.class);
+    private MqttTupleMapper mapper;
+    private transient MqttPublisher publisher;
+    private boolean retain = false;
+    private transient OutputCollector collector;
+    private MqttOptions options;
+    private KeyStoreLoader keyStoreLoader;
+    private transient String topologyName;
+
+
+    public MqttPublishFunction(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader, boolean retain){
+        this.options = options;
+        this.mapper = mapper;
+        this.retain = retain;
+        this.keyStoreLoader = keyStoreLoader;
+        // the following code is duplicated in the constructor of MqttPublisher
+        // we reproduce it here so we fail on the client side if SSL is misconfigured, rather than when the topology
+        // is deployed to the cluster
+        SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader);
+    }
+
+
+    @Override
+    public void prepare(Map conf, TridentOperationContext context) {
+        this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME);
+        this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, this.retain);
+        try {
+            this.publisher.connectMqtt(this.topologyName + "-" + context.getPartitionIndex());
+        } catch (Exception e) {
+            LOG.error("Unable to connect to MQTT Broker.", e);
+            throw new RuntimeException("Unable to connect to MQTT Broker.", e);
+        }
+    }
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+        MqttMessage message = this.mapper.toMessage(tuple);
+        try {
+            this.publisher.publish(message);
+        } catch (Exception e) {
+            LOG.warn("Error publishing MQTT message. Failing tuple.", e);
+            // should we fail the batch or kill the worker?
+            throw new FailedException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java b/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
new file mode 100644
index 0000000..0dd4d73
--- /dev/null
+++ b/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.ITuple;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.storm.mqtt.bolt.MqttBolt;
+import org.apache.storm.mqtt.common.MqttOptions;
+import org.apache.storm.mqtt.common.MqttPublisher;
+import org.apache.storm.mqtt.mappers.StringMessageMapper;
+import org.apache.storm.mqtt.spout.MqttSpout;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Arrays;
+
+@Category(IntegrationTest.class)
+public class StormMqttIntegrationTest implements Serializable{
+    private static final Logger LOG = LoggerFactory.getLogger(StormMqttIntegrationTest.class);
+    private static BrokerService broker;
+    static boolean spoutActivated = false;
+
+    private static final String TEST_TOPIC = "/mqtt-topology";
+    private static final String RESULT_TOPIC = "/integration-result";
+    private static final String RESULT_PAYLOAD = "Storm MQTT Spout";
+
+    public static class TestSpout extends MqttSpout{
+        public TestSpout(MqttMessageMapper type, MqttOptions options){
+            super(type, options);
+        }
+
+        @Override
+        public void activate() {
+            super.activate();
+            LOG.info("Spout activated.");
+            spoutActivated = true;
+        }
+    }
+
+
+    @AfterClass
+    public static void cleanup() throws Exception {
+        broker.stop();
+    }
+
+    @BeforeClass
+    public static void start() throws Exception {
+        LOG.warn("Starting broker...");
+        broker = new BrokerService();
+        broker.addConnector("mqtt://localhost:1883");
+        broker.setDataDirectory("target");
+        broker.start();
+        LOG.debug("MQTT broker started");
+    }
+
+
+    @Test
+    public void testMqttTopology() throws Exception {
+        MQTT client = new MQTT();
+        client.setTracer(new MqttLogger());
+        URI uri = URI.create("tcp://localhost:1883");
+        client.setHost(uri);
+
+        client.setClientId("MQTTSubscriber");
+        client.setCleanSession(false);
+        BlockingConnection connection = client.blockingConnection();
+        connection.connect();
+        Topic[] topics = {new Topic("/integration-result", QoS.AT_LEAST_ONCE)};
+        byte[] qoses = connection.subscribe(topics);
+
+        try (LocalCluster cluster = new LocalCluster();
+             LocalTopology topo = cluster.submitTopology("test", new Config(), buildMqttTopology());) {
+
+            LOG.info("topology started");
+            while(!spoutActivated) {
+                Thread.sleep(500);
+            }
+
+            // publish a retained message to the broker
+            MqttOptions options = new MqttOptions();
+            options.setCleanConnection(false);
+            MqttPublisher publisher = new MqttPublisher(options, true);
+            publisher.connectMqtt("MqttPublisher");
+            publisher.publish(new MqttMessage(TEST_TOPIC, "test".getBytes()));
+
+            LOG.info("published message");
+
+            Message message = connection.receive();
+            LOG.info("Message recieved on topic: {}", message.getTopic());
+            LOG.info("Payload: {}", new String(message.getPayload()));
+            message.ack();
+
+            Assert.assertArrayEquals(message.getPayload(), RESULT_PAYLOAD.getBytes());
+            Assert.assertEquals(message.getTopic(), RESULT_TOPIC);
+        }
+    }
+
+    public StormTopology buildMqttTopology(){
+        TopologyBuilder builder = new TopologyBuilder();
+
+        MqttOptions options = new MqttOptions();
+        options.setTopics(Arrays.asList(TEST_TOPIC));
+        options.setCleanConnection(false);
+        TestSpout spout = new TestSpout(new StringMessageMapper(), options);
+
+        MqttBolt bolt = new MqttBolt(options, new MqttTupleMapper() {
+            @Override
+            public MqttMessage toMessage(ITuple tuple) {
+                LOG.info("Received: {}", tuple);
+                return new MqttMessage(RESULT_TOPIC, RESULT_PAYLOAD.getBytes());
+            }
+        });
+
+        builder.setSpout("mqtt-spout", spout);
+        builder.setBolt("mqtt-bolt", bolt).shuffleGrouping("mqtt-spout");
+
+        return builder.createTopology();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-opentsdb/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-opentsdb/pom.xml b/external/storm-opentsdb/pom.xml
index 0e01cdf..41f7e55 100644
--- a/external/storm-opentsdb/pom.xml
+++ b/external/storm-opentsdb/pom.xml
@@ -44,7 +44,7 @@
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
             <version>${project.version}</version>
-            <scope>provided</scope>
+            <scope>${provided.scope}</scope>
         </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-pmml/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-pmml/pom.xml b/external/storm-pmml/pom.xml
index 2ddfeb3..b44c6f5 100644
--- a/external/storm-pmml/pom.xml
+++ b/external/storm-pmml/pom.xml
@@ -51,7 +51,7 @@
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
             <version>${project.version}</version>
-            <scope>provided</scope>
+            <scope>${provided.scope}</scope>
         </dependency>
 
         <!-- JPMML Compile Time dependencies              LICENSING WARNING!

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-redis/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-redis/pom.xml b/external/storm-redis/pom.xml
index 3c1ca03..102e0f5 100644
--- a/external/storm-redis/pom.xml
+++ b/external/storm-redis/pom.xml
@@ -49,7 +49,7 @@
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
             <version>${project.version}</version>
-            <scope>provided</scope>
+            <scope>${provided.scope}</scope>
         </dependency>
         <dependency>
             <groupId>commons-codec</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-solr/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-solr/pom.xml b/external/storm-solr/pom.xml
index a7cc287..14922ed 100644
--- a/external/storm-solr/pom.xml
+++ b/external/storm-solr/pom.xml
@@ -42,7 +42,7 @@
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
             <version>${project.version}</version>
-            <scope>provided</scope>
+            <scope>${provided.scope}</scope>
         </dependency>
         <!-- Solr and its dependencies -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 896d735..0fef7c8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -308,6 +308,9 @@
         <qpid.version>0.32</qpid.version>
         <eventhubs.client.version>1.0.1</eventhubs.client.version>
         <jersey.version>2.24.1</jersey.version>
+
+        <!-- see intellij profile below... This fixes an annoyance with intellij -->
+        <provided.scope>provided</provided.scope>
     </properties>
 
     <modules>
@@ -361,7 +364,8 @@
         <module>examples/storm-elasticsearch-examples</module>
         <module>examples/storm-mqtt-examples</module>
         <module>examples/storm-pmml-examples</module>
-        <module>storm-perf</module>
+        <module>examples/storm-jms-examples</module>
+        <module>examples/storm-perf</module>
     </modules>
 
     <dependencies>
@@ -374,6 +378,18 @@
     </dependencies>
 
     <profiles>
+        <!--
+            Hack to make intellij behave.
+            If you use intellij, enable this profile in your IDE.
+            It should make life easier.
+        -->
+        <profile>
+            <id>intellij</id>
+            <properties>
+                <provided.scope>compile</provided.scope>
+            </properties>
+        </profile>
+
         <profile>
             <id>rat</id>
             <build>

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-dist/binary/final-package/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/final-package/src/main/assembly/binary.xml b/storm-dist/binary/final-package/src/main/assembly/binary.xml
index a6aab07..9321ea2 100644
--- a/storm-dist/binary/final-package/src/main/assembly/binary.xml
+++ b/storm-dist/binary/final-package/src/main/assembly/binary.xml
@@ -42,8 +42,8 @@
             </includes>
         </fileSet>
         <!--
-            $STORM_HOME/bin
-        -->
+             $STORM_HOME/bin
+         -->
         <fileSet>
             <directory>${project.basedir}/../../../bin</directory>
             <outputDirectory>bin</outputDirectory>
@@ -56,7 +56,6 @@
         <!--
             Allow for variable substitution for cache busting in HTML files and script.js
             See storm-dist/binary/pom.xml for custom variables that will be substituted in.
-
             The source files should have a ${variable to be replaced} wherever
             maven assembly plugin should inject a value.
         -->
@@ -84,11 +83,14 @@
                 <exclude>js/script.js</exclude>
             </excludes>
         </fileSet>
+        <!-- EXAMPLES -->
         <fileSet>
             <directory>${project.basedir}/../../../examples</directory>
             <outputDirectory>examples</outputDirectory>
             <excludes>
                 <exclude>**/target/**</exclude>
+                <exclude>**/dependency-reduced-pom.xml</exclude>
+                <exclude>**/*.iml</exclude>
             </excludes>
         </fileSet>
         <fileSet>
@@ -99,13 +101,7 @@
             <outputDirectory>bin</outputDirectory>
         </fileSet>
         <!-- EXTERNAL -->
-        <fileSet>
-            <directory>${project.basedir}/../../../external/storm-kafka/target</directory>
-            <outputDirectory>external/storm-kafka</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
+        <!-- only include the README file -->
         <fileSet>
             <directory>${project.basedir}/../../../external/storm-kafka</directory>
             <outputDirectory>external/storm-kafka</outputDirectory>
@@ -114,13 +110,6 @@
             </includes>
         </fileSet>
         <fileSet>
-            <directory>${project.basedir}/../../../external/storm-kinesis/target</directory>
-            <outputDirectory>external/storm-kinesis</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
             <directory>${project.basedir}/../../../external/storm-kinesis</directory>
             <outputDirectory>external/storm-kinesis</outputDirectory>
             <includes>
@@ -128,13 +117,6 @@
             </includes>
         </fileSet>
         <fileSet>
-            <directory>${project.basedir}/../../../external/storm-hdfs/target</directory>
-            <outputDirectory>external/storm-hdfs</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
             <directory>${project.basedir}/../../../external/storm-hdfs</directory>
             <outputDirectory>external/storm-hdfs</outputDirectory>
             <includes>
@@ -142,13 +124,6 @@
             </includes>
         </fileSet>
         <fileSet>
-            <directory>${project.basedir}/../../../external/storm-hbase/target</directory>
-            <outputDirectory>external/storm-hbase</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
             <directory>${project.basedir}/../../../external/storm-hbase</directory>
             <outputDirectory>external/storm-hbase</outputDirectory>
             <includes>
@@ -156,13 +131,6 @@
             </includes>
         </fileSet>
         <fileSet>
-            <directory>${project.basedir}/../../../external/storm-jdbc/target</directory>
-            <outputDirectory>external/storm-jdbc</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
             <directory>${project.basedir}/../../../external/storm-jdbc</directory>
             <outputDirectory>external/storm-jdbc</outputDirectory>
             <includes>
@@ -170,13 +138,6 @@
             </includes>
         </fileSet>
         <fileSet>
-            <directory>${project.basedir}/../../../external/storm-hive/target</directory>
-            <outputDirectory>external/storm-hive</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
             <directory>${project.basedir}/../../../external/storm-hive</directory>
             <outputDirectory>external/storm-hive</outputDirectory>
             <includes>
@@ -184,13 +145,6 @@
             </includes>
         </fileSet>
         <fileSet>
-            <directory>${project.basedir}/../../../external/storm-eventhubs/target</directory>
-            <outputDirectory>external/storm-eventhubs</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
             <directory>${project.basedir}/../../../external/storm-eventhubs</directory>
             <outputDirectory>external/storm-eventhubs</outputDirectory>
             <includes>
@@ -198,21 +152,6 @@
             </includes>
         </fileSet>
         <fileSet>
-            <directory>${project.basedir}/../../../external/storm-eventhubs/src/main/resources</directory>
-            <outputDirectory>external/storm-eventhubs</outputDirectory>
-            <includes>
-                <include>Config.properties</include>
-            </includes>
-            <fileMode>0644</fileMode>
-        </fileSet>
-        <fileSet>
-            <directory>${project.basedir}/../../../external/storm-elasticsearch/target</directory>
-            <outputDirectory>external/storm-elasticsearch</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
             <directory>${project.basedir}/../../../external/storm-elasticsearch</directory>
             <outputDirectory>external/storm-elasticsearch</outputDirectory>
             <includes>
@@ -221,41 +160,6 @@
         </fileSet>
 
         <fileSet>
-            <directory>${project.basedir}/../../../external/flux/flux-core/target</directory>
-            <outputDirectory>external/flux</outputDirectory>
-            <includes>
-                <include>flux*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
-            <directory>${project.basedir}/../../../external/flux/flux-wrappers/target</directory>
-            <outputDirectory>external/flux</outputDirectory>
-            <includes>
-                <include>flux*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
-            <directory>${project.basedir}/../../../external/flux/flux-examples/target</directory>
-            <outputDirectory>external/flux</outputDirectory>
-            <includes>
-                <include>flux*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
-            <directory>${project.basedir}/../../../external/flux/flux-examples</directory>
-            <outputDirectory>external/flux/examples</outputDirectory>
-            <includes>
-                <include>README.*</include>
-            </includes>
-        </fileSet>
-        <fileSet>
-            <directory>${project.basedir}/../../../external/flux/flux-examples/src/main/resources</directory>
-            <outputDirectory>external/flux/examples</outputDirectory>
-            <includes>
-                <include>*</include>
-            </includes>
-        </fileSet>
-        <fileSet>
             <directory>${project.basedir}/../../../external/flux</directory>
             <outputDirectory>external/flux</outputDirectory>
             <includes>
@@ -263,13 +167,6 @@
             </includes>
         </fileSet>
         <fileSet>
-            <directory>${project.basedir}/../../../external/storm-redis/target</directory>
-            <outputDirectory>external/storm-redis</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
             <directory>${project.basedir}/../../../external/storm-redis</directory>
             <outputDirectory>external/storm-redis</outputDirectory>
             <includes>
@@ -277,26 +174,13 @@
             </includes>
         </fileSet>
         <fileSet>
-            <directory>${project.basedir}/../../../external/storm-solr/target</directory>
-            <outputDirectory>external/storm-solr</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
             <directory>${project.basedir}/../../../external/storm-solr</directory>
             <outputDirectory>external/storm-solr</outputDirectory>
             <includes>
                 <include>README.*</include>
             </includes>
         </fileSet>
-        <fileSet>
-            <directory>${project.basedir}/../../../external/storm-cassandra/target</directory>
-            <outputDirectory>external/storm-cassandra</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
+        <!-- Storm SQL -->
         <fileSet>
             <directory>${project.basedir}/../../../external/sql/storm-sql-core/target/app-assembler/repo</directory>
             <outputDirectory>external/sql/storm-sql-core</outputDirectory>
@@ -347,7 +231,7 @@
                 <include>README.*</include>
             </includes>
         </fileSet>
-
+        <!-- END Storm SQL -->
         <fileSet>
             <directory>${project.basedir}/../../../external/storm-mqtt</directory>
             <outputDirectory>external/storm-mqtt</outputDirectory>
@@ -356,27 +240,6 @@
             </includes>
         </fileSet>
         <fileSet>
-            <directory>${project.basedir}/../../../external/storm-mqtt/core/target</directory>
-            <outputDirectory>external/storm-mqtt</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
-            <directory>${project.basedir}/../../../external/storm-mqtt/examples/target</directory>
-            <outputDirectory>external/storm-mqtt</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
-            <directory>${project.basedir}/../../../external/storm-mongodb/target</directory>
-            <outputDirectory>external/storm-mongodb</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
             <directory>${project.basedir}/../../../external/storm-mongodb</directory>
             <outputDirectory>external/storm-mongodb</outputDirectory>
             <includes>
@@ -384,13 +247,6 @@
             </includes>
         </fileSet>
         <fileSet>
-            <directory>${project.basedir}/../../../external/storm-kafka-client/target</directory>
-            <outputDirectory>external/storm-kafka-client</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
             <directory>${project.basedir}/../../../external/storm-kafka-client</directory>
             <outputDirectory>external/storm-kafka-client</outputDirectory>
             <includes>
@@ -405,20 +261,6 @@
             </includes>
         </fileSet>
         <fileSet>
-            <directory>${project.basedir}/../../../external/storm-opentsdb/target</directory>
-            <outputDirectory>external/storm-opentsdb</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
-            <directory>${project.basedir}/../../../external/storm-druid/target</directory>
-            <outputDirectory>external/storm-druid</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
             <directory>${project.basedir}/../../../external/storm-druid</directory>
             <outputDirectory>external/storm-druid</outputDirectory>
             <includes>
@@ -440,28 +282,6 @@
             </includes>
         </fileSet>
         <fileSet>
-            <directory>${project.basedir}/../../../external/storm-jms/core/target</directory>
-            <outputDirectory>external/storm-jms</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
-            <directory>${project.basedir}/../../../external/storm-jms/examples/target</directory>
-            <outputDirectory>external/storm-jms</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-
-        <fileSet>
-            <directory>${project.basedir}/../../../external/storm-pmml/target</directory>
-            <outputDirectory>external/storm-pmml</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
             <directory>${project.basedir}/../../../external/storm-pmml</directory>
             <outputDirectory>external/storm-pmml</outputDirectory>
             <includes>
@@ -469,13 +289,6 @@
             </includes>
         </fileSet>
         <fileSet>
-            <directory>${project.basedir}/../../../examples/storm-pmml-examples/target</directory>
-            <outputDirectory>examples/storm-pmml-examples</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
             <directory>${project.basedir}/../../../examples/storm-pmml-examples/src/main/resources</directory>
             <outputDirectory>examples/storm-pmml-examples</outputDirectory>
             <includes>
@@ -509,47 +322,9 @@
                 <include>storm*jar</include>
             </includes>
         </fileSet>
-
-        <fileSet>
-            <directory>${project.basedir}/../../storm-perf/target</directory>
-            <outputDirectory>perf</outputDirectory>
-            <includes>
-                <include>storm*jar</include>
-            </includes>
-        </fileSet>
-        <fileSet>
-            <directory>${project.basedir}/../../storm-perf/src/main/conf</directory>
-            <outputDirectory>perf/conf</outputDirectory>
-            <includes>
-                <include>*yaml</include>
-            </includes>
-        </fileSet>
-        <fileSet>
-            <directory>${project.basedir}/../../storm-perf/src/main/sampledata</directory>
-            <outputDirectory>perf/resources</outputDirectory>
-            <includes>
-                <include>*</include>
-            </includes>
-        </fileSet>
-        <fileSet>
-            <directory>${project.basedir}/../../storm-perf/</directory>
-            <outputDirectory>perf</outputDirectory>
-            <includes>
-                <include>README.*</include>
-            </includes>
-        </fileSet>
-
     </fileSets>
 
     <files>
-        <!-- EXAMPLES -->
-        <file>
-            <source>${project.basedir}/../../../examples/storm-starter/target/storm-starter-${project.version}.jar</source>
-            <outputDirectory>/examples/storm-starter/</outputDirectory>
-            <destName>storm-starter-topologies-${project.version}.jar</destName>
-        </file>
-
-
         <!--
             $STORM_HOME/conf
         -->
@@ -565,6 +340,18 @@
             <destName>storm_env.ini</destName>
             <fileMode>0644</fileMode>
         </file>
+        <file>
+            <source>${project.basedir}/../../../conf/storm-env.sh</source>
+            <outputDirectory>/conf</outputDirectory>
+            <destName>storm-env.sh</destName>
+            <fileMode>0755</fileMode>
+        </file>
+        <file>
+            <source>${project.basedir}/../../../VERSION</source>
+            <outputDirectory>/</outputDirectory>
+            <destName>RELEASE</destName>
+            <filtered>true</filtered>
+        </file>
         <!-- TODO this should be a generated file from "target" -->
         <file>
             <source>${project.basedir}/../../../log4j2/cluster.xml</source>
@@ -601,4 +388,5 @@
             <outputDirectory>/</outputDirectory>
         </file>
     </files>
+
 </assembly>

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/README.markdown
----------------------------------------------------------------------
diff --git a/storm-perf/README.markdown b/storm-perf/README.markdown
deleted file mode 100644
index 946ab21..0000000
--- a/storm-perf/README.markdown
+++ /dev/null
@@ -1,50 +0,0 @@
-# Topologies for measuring Storm performance
-
-This module includes topologies designed for measuring Storm performance.
-
-## Overview
-There are two basic modes for running these topologies
-
-- **Cluster mode:** Submits the topology to a storm cluster. This mode is useful for benchmarking. It calculates throughput and latency numbers every minute and prints them on the console.
-- **In-process mode:** Uses LocalCluster to run topology. This mode helps identify bottlenecks using profilers like JProfiler from within a IDE. This mode does not print metrics.
-
-In both the modes, a shutdown hook is setup to terminate the topology when the program that is submitting the topology is terminated.
-
-The bundled topologies can be classified into two types.
-
-- Topologies that measure purely the internal functioning of Storm. Such topologies do not interact with external systems like Kafka or Hdfs.
-- Topologies that measure speed of I/O with external systems like Kafka and Hdfs.
-
-Topologies that measure internal performance can be run in either in-proc or cluster modes.
-Topologies that measure I/O with external systems are designed to run in cluster mode only.
-
-## Topologies List
-
-1. **ConstSpoutOnlyTopo:** Helps measure how fast spout can emit. This topology has a spout and is not connected to any bolts. Supports in-proc and cluster mode.
-2. **ConstSpoutNullBoltTopo:** Helps measure how fast spout can send data to a bolt. Spout emits a stream of constant values to a DevNull bolt which discards the incoming tuples. Supports in-proc and cluster mode.
-3. **ConstSpoutIdBoltNullBoltTopo:** Helps measure speed of messaging between spouts and bolts. Spout emits a stream of constant values to an ID bolt which clones the tuple and emits it downstream to a DevNull bolt. Supports in-proc and cluster mode.
-4. **FileReadWordCount:** Measures speed of word counting. The spout loads a file into memory and emits these lines in an infinite loop. Supports in-proc and cluster mode.
-5. **HdfsSpoutNullBolt:** Measures speed at which HdfsSpout can read from HDFS. Supports cluster mode only.
-6. **StrGenSpoutHdfsBoltTopo:** Measures speed at which HdfsBolt can write to HDFS. Supports cluster mode only.
-7. **KafkaSpoutNullBolt:** Measures speed at which KafkaSpout can read from Kafka. Supports cluster mode only.
-8. **KafkaHdfsTopo:** Measures how fast Storm can read from Kafka and write to HDFS.
-
-
-## How to run ?
-
-### In-process mode:
-This mode is intended for running the topology quickly and easily from within the IDE and does not expect any command line arguments.
-Simply running the Topology's main() method without any arguments will get it running. The topology runs indefinitely till the program is terminated.
-
-
-### Cluster mode:
-When the topology is run with one or more than one cmd line arguments, the topology is submitted to the cluster.
-The first argument indicates how long the topology should be run. Often the second argument refers to a yaml config
-file which contains topology configuration settings. The conf/ directory in this module contains sample config files
-with names matching the corresponding topology.
-
-These topologies can be run using the standard storm jar command.
-
-```
-bin/storm jar  /path/storm-perf-1.1.0-jar-with-dependencies.jar org.apache.storm.perf.ConstSpoutNullBoltTopo  200  conf/ConstSpoutIdBoltNullBoltTopo.yaml
-```
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/pom.xml
----------------------------------------------------------------------
diff --git a/storm-perf/pom.xml b/storm-perf/pom.xml
deleted file mode 100644
index 8d032d2..0000000
--- a/storm-perf/pom.xml
+++ /dev/null
@@ -1,122 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>2.0.0-SNAPSHOT</version>
-        <relativePath>..</relativePath>
-    </parent>
-
-    <groupId>org.apache.storm</groupId>
-    <artifactId>storm-perf</artifactId>
-    <packaging>jar</packaging>
-    <name>Storm Perf</name>
-    <description>Topologies and tools to measure performance.</description>
-
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <!-- see comment below... This fixes an annoyance with intellij -->
-        <provided.scope>provided</provided.scope>
-    </properties>
-
-    <profiles>
-        <profile>
-            <id>intellij</id>
-            <properties>
-                <provided.scope>compile</provided.scope>
-            </properties>
-        </profile>
-    </profiles>
-
-    <build>
-        <plugins>
-            <plugin>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <configuration>
-                    <descriptorRefs>
-                        <descriptorRef>jar-with-dependencies</descriptorRef>
-                    </descriptorRefs>
-                    <archive>
-                        <manifest>
-                            <mainClass />
-                        </manifest>
-                    </archive>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>make-assembly</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                    </execution>
-                </executions>
-
-            </plugin>
-
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>exec-maven-plugin</artifactId>
-                <version>1.2.1</version>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>exec</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <executable>java</executable>
-                    <includeProjectDependencies>true</includeProjectDependencies>
-                    <includePluginDependencies>false</includePluginDependencies>
-                    <classpathScope>compile</classpathScope>
-                    <mainClass>${storm.topology}</mainClass>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-            <version>2.0.0-SNAPSHOT</version>
-            <!--
-              Use "provided" scope to keep storm out of the jar-with-dependencies
-              For IntelliJ dev, intellij will load properly.
-            -->
-            <scope>${provided.scope}</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-kafka</artifactId>
-            <version>2.0.0-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-hdfs</artifactId>
-            <version>2.0.0-SNAPSHOT</version>
-        </dependency>
-    </dependencies>
-
-
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/conf/ConstSpoutIdBoltNullBoltTopo.yaml
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/conf/ConstSpoutIdBoltNullBoltTopo.yaml b/storm-perf/src/main/conf/ConstSpoutIdBoltNullBoltTopo.yaml
deleted file mode 100644
index 9f74aee..0000000
--- a/storm-perf/src/main/conf/ConstSpoutIdBoltNullBoltTopo.yaml
+++ /dev/null
@@ -1,22 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-spout.count : 1
-bolt1.count : 1  # IdBolt instances
-bolt2.count : 1  # DevNullBolt instances
-
-# storm config overrides
-topology.workers : 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/conf/ConstSpoutNullBoltTopo.yaml
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/conf/ConstSpoutNullBoltTopo.yaml b/storm-perf/src/main/conf/ConstSpoutNullBoltTopo.yaml
deleted file mode 100644
index 51f2dd7..0000000
--- a/storm-perf/src/main/conf/ConstSpoutNullBoltTopo.yaml
+++ /dev/null
@@ -1,22 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-spout.count : 1
-bolt.count : 1
-grouping : "local"  # either  "shuffle" or "local"
-
-# storm config overrides
-topology.workers : 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/conf/FileReadWordCountTopo.yaml
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/conf/FileReadWordCountTopo.yaml b/storm-perf/src/main/conf/FileReadWordCountTopo.yaml
deleted file mode 100644
index 61abe8f..0000000
--- a/storm-perf/src/main/conf/FileReadWordCountTopo.yaml
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-spout.count : 1
-splitter.count : 1
-counter.count : 1
-input.file : "/Users/roshan/Projects/idea/storm/storm-perf/src/main/resources/randomwords.txt"
-
-# storm config overrides
-topology.workers : 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/conf/HdfsSpoutNullBoltTopo.yaml
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/conf/HdfsSpoutNullBoltTopo.yaml b/storm-perf/src/main/conf/HdfsSpoutNullBoltTopo.yaml
deleted file mode 100644
index a06ad6e..0000000
--- a/storm-perf/src/main/conf/HdfsSpoutNullBoltTopo.yaml
+++ /dev/null
@@ -1,25 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-spout.count : 1
-bolt.count : 1
-hdfs.uri : "hdfs://hdfs.namenode:8020"
-hdfs.source.dir :  "/tmp/storm/in"
-hdfs.archive.dir : "/tmp/storm/done"
-hdfs.bad.dir : "/tmp/storm/bad"
-
-# storm config overrides
-topology.workers : 1
\ No newline at end of file


Mime
View raw message