nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NIFI-1808) Create General MQTT Processors
Date Tue, 17 May 2016 18:53:13 GMT

    [ https://issues.apache.org/jira/browse/NIFI-1808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15287279#comment-15287279
] 

ASF GitHub Bot commented on NIFI-1808:
--------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/392#discussion_r63581819
  
    --- Diff: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
---
    @@ -0,0 +1,342 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.mqtt.common;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.expression.AttributeExpression;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.eclipse.paho.client.mqttv3.IMqttClient;
    +import org.eclipse.paho.client.mqttv3.MqttClient;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    +
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
    +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
    +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
    +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
    +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
    +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
    +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
    +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
    +
    +public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProcessor {
    +
    +    protected ProcessorLog logger;
    +    protected IMqttClient mqttClient;
    +    protected final Object mqttClientConnectLock = new Object();
    +    protected volatile String broker;
    +    protected volatile String clientID;
    +    protected MqttConnectOptions connOpts;
    +    protected MemoryPersistence persistence = new MemoryPersistence();
    +    protected long maxTimeout;
    +
    +    public ProcessSessionFactory processSessionFactory;
    +
    +    public static final Validator QoSValidator = new Validator() {
    +
    +        @Override
    +        public ValidationResult validate(String subject, String input, ValidationContext
context) {
    +            Integer inputInt = Integer.parseInt(input);
    +            if (inputInt < 0 || inputInt > 2) {
    +                return new ValidationResult.Builder().subject(subject).valid(false).explanation("QoS
must be an integer between 0 and 2").build();
    +            }
    +            return new ValidationResult.Builder().subject(subject).valid(true).build();
    +        }
    +    };
    +
    +    public static final Validator BrokerValidator = new Validator() {
    +
    +        @Override
    +        public ValidationResult validate(String subject, String input, ValidationContext
context) {
    +            try{
    +                URI brokerURI = new URI(input);
    +                if (!"".equals(brokerURI.getPath())) {
    +                    return new ValidationResult.Builder().subject(subject).valid(false).explanation("the
broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
    --- End diff --
    
    As a whole yes, I was pointing to the line above and if you can use an existing validator
for it.


> Create General MQTT Processors
> ------------------------------
>
>                 Key: NIFI-1808
>                 URL: https://issues.apache.org/jira/browse/NIFI-1808
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Joseph Percivall
>            Assignee: Joseph Percivall
>
> MQTT[1] is a great "Internet of Things" (IoT) connectivity protocol that implementing
processors for would allow NiFi to continue expanding into the IoT domain. A prime opportunity
would be to use in conjunction with Apache NiFi - MiNiFi.
> [1] http://mqtt.org/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message