nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NIFI-1767) AWS IoT processors
Date Wed, 01 Jun 2016 18:52:59 GMT

    [ https://issues.apache.org/jira/browse/NIFI-1767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15310876#comment-15310876
] 

ASF GitHub Bot commented on NIFI-1767:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65420794
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/PutAWSIoT.java
---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.Map;
    +
    +@Tags({"Amazon", "AWS", "IOT", "MQTT", "Websockets", "Put", "Publish", "Send"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Publishes messages to MQTT-topic(s) of AWS IoT. The processor
keeps open a WebSocket connection and will automatically renew the " +
    +        "connection to overcome Amazon's service limit on maximum connection duration.
Most of the " +
    +        "configuration can be overridden by values coming in as message attributes. This
applies for " +
    +        "the topic (corresponding message attribute is \"aws.iot.mqtt.topic.override\"),
the qos-level " +
    +        "(\"aws.iot.mqtt.qos.override\") and the retention (\"aws.iot.mqtt.retained.override\")")
    +@SeeAlso({ PutAWSIoTShadow.class })
    +@ReadsAttributes({
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.topic.override", description = "Overrides
the processor configuration for topic."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.qos.override", description = "Overrides
the processor configuration for quality of service."),
    +        @ReadsAttribute(attribute = "aws.iot.mqtt.retained.override", description = "Overrides
the processor configuration for retaining a published state in the AWS shadow.")
    +})
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aws.iot.mqtt.exception", description = "Error details")
    +})
    +public class PutAWSIoT extends AbstractAWSIoTProcessor {
    +    private final static String PROP_NAME_RETAINED = "aws.iot.mqtt.retained";
    +    private final static String ATTR_NAME_TOPIC = PROP_NAME_TOPIC + ".override";
    +    private final static String ATTR_NAME_QOS = PROP_NAME_QOS + ".override";
    +    private final static String ATTR_NAME_RETAINED = PROP_NAME_RETAINED + ".override";
    +    private final static String ATTR_NAME_EXCEPTION = "aws.iot.mqtt.exception";
    +    private final static Boolean PROP_DEFAULT_RETAINED = false;
    +    private Boolean shouldRetain;
    +
    +    public static final PropertyDescriptor PROP_RETAINED = new PropertyDescriptor
    +            .Builder().name(PROP_NAME_RETAINED)
    +            .description("For messages being published, a true setting indicates that
the MQTT server " +
    +                    "should retain a copy of the message. The message will then be transmitted
to new " +
    +                    "subscribers to a topic that matches the message topic. For subscribers
registering " +
    +                    "a new subscription, the flag being true indicates that the received
message is not " +
    +                    "a new one, but one that has been retained by the MQTT server.")
    +            .required(true)
    +            .defaultValue(PROP_DEFAULT_RETAINED.toString())
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(
    +                    PROP_QOS,
    +                    PROP_TOPIC,
    +                    PROP_RETAINED,
    +                    PROP_ENDPOINT,
    +                    PROP_KEEPALIVE,
    +                    PROP_CLIENT,
    +                    AWS_CREDENTIALS_PROVIDER_SERVICE,
    +                    TIMEOUT,
    +                    REGION));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    --- End diff --
    
    Only the success relationship is exposed, should also include the failure relationship.


> AWS IoT processors
> ------------------
>
>                 Key: NIFI-1767
>                 URL: https://issues.apache.org/jira/browse/NIFI-1767
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Kay Lerch
>         Attachments: 20160413_apache-nifi-aws-iot-pull-request_lerchkay.pdf
>
>
> Four new processors to communicate with Amazon’s managed device gateway service AWS
IoT.
> h5.Use cases
> * Consume reported states from a fleet of things managed and secured on Amazon’s gateway
service
> * Propagate desired states to a fleet of things managed and secured on Amazon’s gateway
service
> * Intercept M2M communication
> * Hybrid IoT solutions: brings together a managed device gateway in the cloud and onpremise
data-consumers and -providers.
> h4.GetIOTMqtt:
> Opens up a connection to an AWS-account-specific websocket endpoint in order to subscribe
to any of the MQTT topics belonging to a registered thing in AWS IoT.
> h4.PutIOTMqtt
> Opens up a connection to an AWS-account-specific websocket endpoint in order to publish
messages to any of the MQTT topics belonging to a registered thing in AWS IoT.
> h4.GetIOTShadow
> In AWS IoT a physical thing is represented with its last reported state by the so-called
thing shadow. This processor reads out the current state of a shadow (persisted as JSON) by
requesting the managed API of AWS IoT.
> h4.PutIOTShadow
> In AWS IoT a physical thing is represented with its last reported state by the so-called
thing shadow. This processor updates the current state of a shadow (persisted as JSON) by
requesting the managed API of AWS IoT. An update to a shadow lets AWS IoT propagate changes
to the MQTT topics of the thing.
> h5.Known issues:
> * It was hard for me to write appropriate integration tests since the MQTT processors
work with durable websocket-connections which are kind of tough to test. With your help I
would love to do a better job on testing and hand it in later on. All of the processors were
tested in a live-scenario which ran over a longer period of time. Didn’t observe any issue.
> * I got rid of all the properties for the deprecated AWSCredentialProviderService and
only made use of AWSCredentialsProviderControllerService. If both are still necessary for
backward-compatibilities sake I would add the deprecated feature.
> Refers to Pull Request 349: https://github.com/apache/nifi/pull/349



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message