nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <>
Subject [jira] [Commented] (NIFI-1808) Create General MQTT Processors
Date Tue, 17 May 2016 14:19:13 GMT


ASF GitHub Bot commented on NIFI-1808:

Github user olegz commented on a diff in the pull request:
    --- Diff: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/
    @@ -0,0 +1,341 @@
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.mqtt;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
    +import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
    +import org.eclipse.paho.client.mqttv3.MqttCallback;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +import org.eclipse.paho.client.mqttv3.MqttMessage;
    +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
    +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
    +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
    +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
    +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
    +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
    +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
    +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
    +@Tags({"subscribe", "MQTT", "IOT", "consume", "listen"})
    +@TriggerSerially // we want to have a consistent mapping between clientID and MQTT connection
    +@CapabilityDescription("Subscribes to a topic and receives messages from an MQTT broker")
    +    @WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT broker that was
the message source"),
    +    @WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT topic on which
message was received"),
    +    @WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality of service
for this message."),
    +    @WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY, description="Whether or not
this message might be a duplicate of one which has already been received."),
    +    @WritesAttribute(attribute=IS_RETAINED_ATTRIBUTE_KEY, description="Whether or not
this message was from a current publisher, or was \"retained\" by the server as the last message
published " +
    +            "on the topic.")})
    +public class ConsumeMQTT extends AbstractMQTTProcessor {
    +    public final static String BROKER_ATTRIBUTE_KEY =  "";
    +    public final static String TOPIC_ATTRIBUTE_KEY =  "mqtt.topic";
    +    public final static String QOS_ATTRIBUTE_KEY =  "mqtt.qos";
    +    public final static String IS_DUPLICATE_ATTRIBUTE_KEY =  "mqtt.isDuplicate";
    +    public final static String IS_RETAINED_ATTRIBUTE_KEY =  "mqtt.isRetained";
    +    public static final PropertyDescriptor PROP_TOPIC_FILTER = new PropertyDescriptor.Builder()
    +            .name("Topic Filter")
    +            .description("The MQTT topic filter to designate the topics to subscribe
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor.Builder()
    +            .name("Quality of Service(QoS)")
    +            .description("The Quality of Service(QoS) to receive the message with. Accepts
values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly
    +            .required(true)
    +            .defaultValue(ALLOWABLE_VALUE_QOS_0.getValue())
    +            .allowableValues(
    +                    ALLOWABLE_VALUE_QOS_0,
    +                    ALLOWABLE_VALUE_QOS_1,
    +                    ALLOWABLE_VALUE_QOS_2)
    +            .build();
    +    public static final PropertyDescriptor PROP_MAX_QUEUE_SIZE = new PropertyDescriptor.Builder()
    +            .name("Max Queue Size")
    +            .description("The MQTT messages are always being sent to subscribers on a
topic. If the 'Run Schedule' is significantly behind the rate at which the messages are arriving
to this " +
    +                    "processor then a back up can occur. This property specifies the
maximum number of messages this processor will hold in memory at one time.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +    private static int DISCONNECT_TIMEOUT = 5000;
    +    private long maxQueueSize;
    --- End diff --
    Hate to be picky, but given that this is set in @OnSchduled but used elsewhere (other
threads) it needs to be _volatile_.

> Create General MQTT Processors
> ------------------------------
>                 Key: NIFI-1808
>                 URL:
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Joseph Percivall
>            Assignee: Joseph Percivall
> MQTT[1] is a great "Internet of Things" (IoT) connectivity protocol that implementing
processors for would allow NiFi to continue expanding into the IoT domain. A prime opportunity
would be to use in conjunction with Apache NiFi - MiNiFi.
> [1]

This message was sent by Atlassian JIRA

View raw message