nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NIFI-1808) Create General MQTT Processors
Date Tue, 17 May 2016 14:43:13 GMT

    [ https://issues.apache.org/jira/browse/NIFI-1808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15286744#comment-15286744
] 

ASF GitHub Bot commented on NIFI-1808:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/392#discussion_r63534793
  
    --- Diff: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.mqtt;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
    +import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
    +import org.eclipse.paho.client.mqttv3.MqttCallback;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +import org.eclipse.paho.client.mqttv3.MqttMessage;
    +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.io.OutputStream;
    +import java.io.IOException;
    +
    +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
    +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
    +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
    +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
    +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
    +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
    +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
    +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
    +
    +
    +@Tags({"subscribe", "MQTT", "IOT", "consume", "listen"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@TriggerSerially // we want to have a consistent mapping between clientID and MQTT connection
    +@CapabilityDescription("Subscribes to a topic and receives messages from an MQTT broker")
    +@SeeAlso({PublishMQTT.class})
    +@WritesAttributes({
    +    @WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT broker that was
the message source"),
    +    @WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT topic on which
message was received"),
    +    @WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality of service
for this message."),
    +    @WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY, description="Whether or not
this message might be a duplicate of one which has already been received."),
    +    @WritesAttribute(attribute=IS_RETAINED_ATTRIBUTE_KEY, description="Whether or not
this message was from a current publisher, or was \"retained\" by the server as the last message
published " +
    +            "on the topic.")})
    +public class ConsumeMQTT extends AbstractMQTTProcessor {
    +    public final static String BROKER_ATTRIBUTE_KEY =  "mqtt.broker";
    +    public final static String TOPIC_ATTRIBUTE_KEY =  "mqtt.topic";
    +    public final static String QOS_ATTRIBUTE_KEY =  "mqtt.qos";
    +    public final static String IS_DUPLICATE_ATTRIBUTE_KEY =  "mqtt.isDuplicate";
    +    public final static String IS_RETAINED_ATTRIBUTE_KEY =  "mqtt.isRetained";
    +
    +    public static final PropertyDescriptor PROP_TOPIC_FILTER = new PropertyDescriptor.Builder()
    +            .name("Topic Filter")
    +            .description("The MQTT topic filter to designate the topics to subscribe
to.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor.Builder()
    +            .name("Quality of Service(QoS)")
    +            .description("The Quality of Service(QoS) to receive the message with. Accepts
values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly
once'.")
    +            .required(true)
    +            .defaultValue(ALLOWABLE_VALUE_QOS_0.getValue())
    +            .allowableValues(
    +                    ALLOWABLE_VALUE_QOS_0,
    +                    ALLOWABLE_VALUE_QOS_1,
    +                    ALLOWABLE_VALUE_QOS_2)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_MAX_QUEUE_SIZE = new PropertyDescriptor.Builder()
    +            .name("Max Queue Size")
    +            .description("The MQTT messages are always being sent to subscribers on a
topic. If the 'Run Schedule' is significantly behind the rate at which the messages are arriving
to this " +
    +                    "processor then a back up can occur. This property specifies the
maximum number of messages this processor will hold in memory at one time.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +
    +    private static int DISCONNECT_TIMEOUT = 5000;
    +    private long maxQueueSize;
    +
    +    private volatile boolean subscribed = false;
    +    private volatile int qos;
    +    private volatile String topicFilter;
    +
    +    private volatile LinkedBlockingQueue<MQTTQueueMessage> mqttQueue;
    +
    +    // For Testing
    +    public LinkedBlockingQueue<MQTTQueueMessage> getMqttQueue(){
    +        return mqttQueue;
    +    }
    +
    +    public static final Relationship REL_MESSAGE = new Relationship.Builder()
    +            .name("Message")
    +            .description("The MQTT message output")
    +            .build();
    +
    +    private static final List<PropertyDescriptor> descriptors;
    +    private static final Set<Relationship> relationships;
    +
    +    static{
    +        final List<PropertyDescriptor> innerDescriptorsList = getAbstractPropertyDescriptors();
    +        innerDescriptorsList.add(PROP_TOPIC_FILTER);
    +        innerDescriptorsList.add(PROP_QOS);
    +        innerDescriptorsList.add(PROP_MAX_QUEUE_SIZE);
    +        descriptors = Collections.unmodifiableList(innerDescriptorsList);
    +
    +        final Set<Relationship> innerRelationshipsSet = new HashSet<Relationship>();
    +        innerRelationshipsSet.add(REL_MESSAGE);
    +        relationships = Collections.unmodifiableSet(innerRelationshipsSet);
    +    }
    +
    +    @Override
    +    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String
newValue) {
    +        // resize the receive buffer, but preserve data
    +        if (descriptor == PROP_MAX_QUEUE_SIZE) {
    +            // it's a mandatory integer, never null
    +            int newSize = Integer.valueOf(newValue);
    +            if (mqttQueue != null) {
    +                int msgPending = mqttQueue.size();
    +                if (msgPending > newSize) {
    +                    logger.debug("New receive buffer size ({}) is smaller than the number
of messages pending ({}), ignoring resize request",
    +                            new Object[]{newSize, msgPending});
    +                    return;
    +                }
    +                LinkedBlockingQueue<MQTTQueueMessage> newBuffer = new LinkedBlockingQueue<>(newSize);
    +                mqttQueue.drainTo(newBuffer);
    +                mqttQueue = newBuffer;
    +            }
    +
    +        }
    +    }
    +
    +    @Override
    +    public Collection<ValidationResult> customValidate(ValidationContext context)
{
    +        final Collection<ValidationResult> results = super.customValidate(context);
    +        int newSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asInteger();
    +        if (mqttQueue == null) {
    +            mqttQueue = new LinkedBlockingQueue<>(context.getProperty(PROP_MAX_QUEUE_SIZE).asInteger());
    +        }
    +        int msgPending = mqttQueue.size();
    +        if (msgPending > newSize) {
    +            results.add(new ValidationResult.Builder()
    +                    .valid(false)
    +                    .subject("ConsumeMQTT Configuration")
    +                    .explanation(String.format("%s (%d) is smaller than the number of
messages pending (%d).",
    +                            PROP_MAX_QUEUE_SIZE.getDisplayName(), newSize, msgPending))
    +                    .build());
    +        }
    +
    +        return results;
    +    }
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        logger = getLogger();
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) throws IOException, ClassNotFoundException
{
    +        qos = context.getProperty(PROP_QOS).asInteger();
    +        maxQueueSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asLong();
    +        topicFilter = context.getProperty(PROP_TOPIC_FILTER).getValue();
    +
    +        buildClient(context);
    +    }
    +
    +    @OnUnscheduled
    +    public void onUnscheduled(final ProcessContext context) {
    +        try {
    +            synchronized (mqttClientConnectLock) {
    +                if(mqttClient != null && mqttClient.isConnected()) {
    +                    mqttClient.disconnect(DISCONNECT_TIMEOUT);
    +                    logger.info("Disconnected the MQTT client.");
    +                }
    +            }
    +        } catch(MqttException me) {
    +            logger.error("Failed when disconnecting the MQTT client.", me);
    +        }
    +    }
    --- End diff --
    
    Yup, you're correct. I will add a check for if it is scheduled or not.


> Create General MQTT Processors
> ------------------------------
>
>                 Key: NIFI-1808
>                 URL: https://issues.apache.org/jira/browse/NIFI-1808
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Joseph Percivall
>            Assignee: Joseph Percivall
>
> MQTT[1] is a great "Internet of Things" (IoT) connectivity protocol that implementing
processors for would allow NiFi to continue expanding into the IoT domain. A prime opportunity
would be to use in conjunction with Apache NiFi - MiNiFi.
> [1] http://mqtt.org/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message