nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From olegz <...@git.apache.org>
Subject [GitHub] nifi pull request: NIFI-1808 Creating general MQTT processors
Date Tue, 17 May 2016 15:50:48 GMT
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/392#discussion_r63549263
  
    --- Diff: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.mqtt;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
    +import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
    +import org.eclipse.paho.client.mqttv3.MqttCallback;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +import org.eclipse.paho.client.mqttv3.MqttMessage;
    +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.io.OutputStream;
    +import java.io.IOException;
    +
    +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
    +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
    +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
    +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
    +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
    +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
    +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
    +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
    +
    +
    +@Tags({"subscribe", "MQTT", "IOT", "consume", "listen"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@TriggerSerially // we want to have a consistent mapping between clientID and MQTT connection
    +@CapabilityDescription("Subscribes to a topic and receives messages from an MQTT broker")
    +@SeeAlso({PublishMQTT.class})
    +@WritesAttributes({
    +    @WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT broker that was
the message source"),
    +    @WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT topic on which
message was received"),
    +    @WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality of service
for this message."),
    +    @WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY, description="Whether or not
this message might be a duplicate of one which has already been received."),
    +    @WritesAttribute(attribute=IS_RETAINED_ATTRIBUTE_KEY, description="Whether or not
this message was from a current publisher, or was \"retained\" by the server as the last message
published " +
    +            "on the topic.")})
    +public class ConsumeMQTT extends AbstractMQTTProcessor {
    +    public final static String BROKER_ATTRIBUTE_KEY =  "mqtt.broker";
    +    public final static String TOPIC_ATTRIBUTE_KEY =  "mqtt.topic";
    +    public final static String QOS_ATTRIBUTE_KEY =  "mqtt.qos";
    +    public final static String IS_DUPLICATE_ATTRIBUTE_KEY =  "mqtt.isDuplicate";
    +    public final static String IS_RETAINED_ATTRIBUTE_KEY =  "mqtt.isRetained";
    +
    +    public static final PropertyDescriptor PROP_TOPIC_FILTER = new PropertyDescriptor.Builder()
    +            .name("Topic Filter")
    +            .description("The MQTT topic filter to designate the topics to subscribe
to.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor.Builder()
    +            .name("Quality of Service(QoS)")
    +            .description("The Quality of Service(QoS) to receive the message with. Accepts
values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly
once'.")
    +            .required(true)
    +            .defaultValue(ALLOWABLE_VALUE_QOS_0.getValue())
    +            .allowableValues(
    +                    ALLOWABLE_VALUE_QOS_0,
    +                    ALLOWABLE_VALUE_QOS_1,
    +                    ALLOWABLE_VALUE_QOS_2)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_MAX_QUEUE_SIZE = new PropertyDescriptor.Builder()
    +            .name("Max Queue Size")
    +            .description("The MQTT messages are always being sent to subscribers on a
topic. If the 'Run Schedule' is significantly behind the rate at which the messages are arriving
to this " +
    +                    "processor then a back up can occur. This property specifies the
maximum number of messages this processor will hold in memory at one time.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +
    +    private static int DISCONNECT_TIMEOUT = 5000;
    +    private long maxQueueSize;
    +
    +    private volatile boolean subscribed = false;
    +    private volatile int qos;
    +    private volatile String topicFilter;
    +
    +    private volatile LinkedBlockingQueue<MQTTQueueMessage> mqttQueue;
    +
    +    // For Testing
    +    public LinkedBlockingQueue<MQTTQueueMessage> getMqttQueue(){
    +        return mqttQueue;
    +    }
    +
    +    public static final Relationship REL_MESSAGE = new Relationship.Builder()
    +            .name("Message")
    +            .description("The MQTT message output")
    +            .build();
    +
    +    private static final List<PropertyDescriptor> descriptors;
    +    private static final Set<Relationship> relationships;
    +
    +    static{
    +        final List<PropertyDescriptor> innerDescriptorsList = getAbstractPropertyDescriptors();
    +        innerDescriptorsList.add(PROP_TOPIC_FILTER);
    +        innerDescriptorsList.add(PROP_QOS);
    +        innerDescriptorsList.add(PROP_MAX_QUEUE_SIZE);
    +        descriptors = Collections.unmodifiableList(innerDescriptorsList);
    +
    +        final Set<Relationship> innerRelationshipsSet = new HashSet<Relationship>();
    +        innerRelationshipsSet.add(REL_MESSAGE);
    +        relationships = Collections.unmodifiableSet(innerRelationshipsSet);
    +    }
    +
    +    @Override
    +    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String
newValue) {
    +        // resize the receive buffer, but preserve data
    +        if (descriptor == PROP_MAX_QUEUE_SIZE) {
    +            // it's a mandatory integer, never null
    +            int newSize = Integer.valueOf(newValue);
    +            if (mqttQueue != null) {
    +                int msgPending = mqttQueue.size();
    +                if (msgPending > newSize) {
    +                    logger.debug("New receive buffer size ({}) is smaller than the number
of messages pending ({}), ignoring resize request",
    +                            new Object[]{newSize, msgPending});
    +                    return;
    +                }
    +                LinkedBlockingQueue<MQTTQueueMessage> newBuffer = new LinkedBlockingQueue<>(newSize);
    +                mqttQueue.drainTo(newBuffer);
    +                mqttQueue = newBuffer;
    +            }
    +
    +        }
    +    }
    +
    +    @Override
    +    public Collection<ValidationResult> customValidate(ValidationContext context)
{
    +        final Collection<ValidationResult> results = super.customValidate(context);
    +        int newSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asInteger();
    +        if (mqttQueue == null) {
    +            mqttQueue = new LinkedBlockingQueue<>(context.getProperty(PROP_MAX_QUEUE_SIZE).asInteger());
    +        }
    +        int msgPending = mqttQueue.size();
    +        if (msgPending > newSize) {
    +            results.add(new ValidationResult.Builder()
    +                    .valid(false)
    +                    .subject("ConsumeMQTT Configuration")
    +                    .explanation(String.format("%s (%d) is smaller than the number of
messages pending (%d).",
    +                            PROP_MAX_QUEUE_SIZE.getDisplayName(), newSize, msgPending))
    +                    .build());
    +        }
    +
    +        return results;
    +    }
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        logger = getLogger();
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) throws IOException, ClassNotFoundException
{
    +        qos = context.getProperty(PROP_QOS).asInteger();
    +        maxQueueSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asLong();
    +        topicFilter = context.getProperty(PROP_TOPIC_FILTER).getValue();
    +
    +        buildClient(context);
    +    }
    +
    +    @OnUnscheduled
    +    public void onUnscheduled(final ProcessContext context) {
    +        try {
    +            synchronized (mqttClientConnectLock) {
    +                if(mqttClient != null && mqttClient.isConnected()) {
    +                    mqttClient.disconnect(DISCONNECT_TIMEOUT);
    +                    logger.info("Disconnected the MQTT client.");
    +                }
    +            }
    +        } catch(MqttException me) {
    +            logger.error("Failed when disconnecting the MQTT client.", me);
    +        }
    +    }
    +
    +
    +    @OnStopped
    +    public void onStopped(final ProcessContext context) throws IOException {
    +        if(processSessionFactory != null) {
    +            logger.info("Finishing processing leftover messages");
    +            ProcessSession session = processSessionFactory.createSession();
    +            transferQueue(session);
    +        } else {
    +            if (mqttQueue!= null && mqttQueue.size() > 0){
    +                throw new ProcessException("Attempting to stop processor but stored ProcessSessionFactory
is not set, there are messages in the MQTT Queue and it is configured to " +
    +                        "finish processing the messages.");
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session)
throws ProcessException {
    +        if (mqttQueue.isEmpty() && !isConnected()){
    +            logger.info("Queue is empty and client is not connected. Attempting to reconnect.");
    +
    +            try {
    +                reconnect();
    +            } catch (MqttException e) {
    +                logger.error("Connection to " + broker + " lost (or was never connected)
and ontrigger connect failed. Yielding processor", e);
    +                context.yield();
    +            }
    +        }
    +
    +        if (mqttQueue.isEmpty()) {
    +            return;
    +        }
    +
    +        transferQueue(session);
    +    }
    +
    +    private void transferQueue(ProcessSession session){
    +        while (!mqttQueue.isEmpty()) {
    +            FlowFile messageFlowfile = session.create();
    +            final MQTTQueueMessage mqttMessage = mqttQueue.peek();
    +
    +            Map<String, String> attrs = new HashMap<>();
    +            attrs.put(BROKER_ATTRIBUTE_KEY, broker);
    +            attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
    +            attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
    +            attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
    +            attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
    +
    +            messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
    +
    +            messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback()
{
    +                @Override
    +                public void process(final OutputStream out) throws IOException {
    +                    out.write(mqttMessage.getPayload());
    +                }
    +            });
    +
    +            String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
    +            session.getProvenanceReporter().receive(messageFlowfile, transitUri);
    +            session.transfer(messageFlowfile, REL_MESSAGE);
    +            mqttQueue.remove(mqttMessage);
    +            session.commit();
    +        }
    +    }
    +
    +    private class ConsumeMQTTCallback implements MqttCallback {
    +
    +        @Override
    +        public void connectionLost(Throwable cause) {
    +            logger.warn("Connection to " + broker + " lost", cause);
    +            try {
    +                reconnect();
    +            } catch (MqttException e) {
    +                logger.error("Connection to " + broker + " lost and callback re-connect
failed.");
    +            }
    +        }
    +
    +        @Override
    +        public void messageArrived(String topic, MqttMessage message) throws Exception
{
    +            logger.info("MQTT message arrived on topic:" + topic);
    +            if (mqttQueue.size() >= maxQueueSize){
    +                throw new IllegalStateException("The subscriber queue is full, cannot
receive another message until the processor is scheduled to run.");
    +            } else {
    +                mqttQueue.add(new MQTTQueueMessage(topic, message));
    +            }
    +        }
    +
    +        @Override
    +        public void deliveryComplete(IMqttDeliveryToken token) {
    +            logger.warn("Received MQTT 'delivery complete' message to subscriber:"+ token);
    +        }
    +    }
    +
    +    // Public for testing
    +    public void reconnect() throws MqttException {
    +        synchronized (mqttClientConnectLock) {
    +            if (!mqttClient.isConnected()) {
    +                mqttClient = getMqttClient(broker, clientID, persistence);
    +                mqttClient.setCallback(new ConsumeMQTTCallback());
    +                mqttClient.connect(connOpts);
    +                if(!subscribed){
    +                    mqttClient.subscribe(topicFilter, qos);
    +                    subscribed = true;
    +                }
    +            }
    +        }
    +    }
    +
    +    // Public for testing
    +    public boolean isConnected(){
    --- End diff --
    
    "minimal in it's scope of impact" - may be for today, but you can not possibly answer
this question for tomorrow. In fact by making it public one can argue you've created the 'scope'
(however minimal it may be)  when it didn't even have to exist in the first place.
    As for an example just get an instance of 'mqttClient' instance using the _Class.getDeclaredField_
dance and perform the same evaluation.
    ```
    ConsumeMQTT mqtt = ...
    Field f = mqtt.getClass().getDeclaredField("mqttClient");
    f.setAccessible(true);
    MqttClient mqttClient = f.get(mqtt);
    // do your check on mqttClient
    ```
    Put it in some private method within your test cases (some abstract class or some util
class) and you're done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message