nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joewitt <...@git.apache.org>
Subject [GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...
Date Sun, 24 Apr 2016 02:17:30 GMT
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60837830
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
---
    @@ -0,0 +1,350 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.common.serialization.ByteArraySerializer;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult;
    +import org.apache.nifi.processors.kafka.pubsub.Partitioners.RoundRobinPartitioner;
    +
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
    +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka.
The messages to send may be individual FlowFiles or may be delimited, using a "
    +        + "user-specified delimiter, such as a new-line.")
    +@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value
of a given Kafka configuration property.",
    +                 description = "These properties will be added on the Kafka configuration
after loading any provided configuration properties."
    +        + " In the event a dynamic property represents a property that was already set
as part of the static properties, its value wil be"
    +        + " overriden with warning message describing the override."
    +        + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.")
    +public class PublishKafka extends AbstractKafkaProcessor<KafkaPublisher> {
    +
    +    protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id";
    +
    +    protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx";
    +
    +    protected static final String FAILED_TOPIC_ATTR = "failed.topic";
    +
    +    protected static final String FAILED_KEY_ATTR = "failed.key";
    +
    +    protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter";
    +
    +    static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee
Replicated Delivery",
    +            "FlowFile will be routed to failure unless the message is replicated to the
appropriate "
    +                    + "number of Kafka Nodes according to the Topic configuration");
    +    static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee
Single Node Delivery",
    +            "FlowFile will be routed to success if the message is received by a single
Kafka node, "
    +                    + "whether or not it is replicated. This is faster than <Guarantee
Replicated Delivery> "
    +                    + "but can result in data loss if a Kafka node crashes");
    +    static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best
Effort",
    +            "FlowFile will be routed to success after successfully writing the content
to a Kafka node, "
    +                    + "without waiting for a response. This provides the best performance
but may result in data loss.");
    +
    +
    +    static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(RoundRobinPartitioner.class.getName(),
    +            RoundRobinPartitioner.class.getSimpleName(),
    +            "Messages will be assigned partitions in a round-robin fashion, sending the
first message to Partition 1, "
    +                    + "the next Partition to Partition 2, and so on, wrapping as necessary.");
    +    static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
    +            "DefaultPartitioner", "Messages will be assigned to random partitions.");
    +
    +    static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.ACKS_CONFIG)
    +            .displayName("Delivery Guarantee")
    +            .description("Specifies the requirement for guaranteeing that a message is
sent to Kafka. Corresponds to Kafka's 'acks' property.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
    +            .defaultValue(DELIVERY_BEST_EFFORT.getValue())
    +            .build();
    +    static final PropertyDescriptor META_WAIT_TIME = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.MAX_BLOCK_MS_CONFIG)
    +            .displayName("Meta Data Wait Time")
    +            .description("The amount of time KafkaConsumer will wait to obtain metadata
during the 'send' call before failing the "
    +                            + "entire 'send' call. Corresponds to Kafka's 'max.block.ms'
property")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("30 sec")
    +            .build();
    +    static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
    +            .name("kafka-key")
    +            .displayName("Kafka Key")
    +            .description("The Key to use for the Message")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +    static final PropertyDescriptor MESSAGE_DEMARCATOR = MESSAGE_DEMARCATOR_BUILDER
    +            .description("Specifies the string (interpreted as UTF-8) to use for demarcating
apart multiple messages within "
    +                    + "a single FlowFile. If not specified, the entire content of the
FlowFile will be used as a single message. If specified, the "
    +                            + "contents of the FlowFile will be split on this delimiter
and each section sent as a separate Kafka message. "
    +                            + "To enter special character such as 'new line' use Shift+Enter.")
    +            .build();
    +    static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
    +            .displayName("Partitioner class")
    +            .description("Specifies which class to use to compute a partition id for
a message. Corresponds to Kafka's 'partitioner.class' property.")
    +            .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
    +            .defaultValue(RANDOM_PARTITIONING.getValue())
    +            .required(false)
    +            .build();
    +    static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
    +            .name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
    +            .displayName("Compression Type")
    +            .description("This parameter allows you to specify the compression codec
for all data generated by this producer.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .allowableValues("none", "gzip", "snappy", "lz4")
    +            .defaultValue("none")
    +            .build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Any FlowFile that cannot be sent to Kafka will be routed to
this Relationship")
    +            .build();
    +
    +    static final List<PropertyDescriptor> descriptors;
    +
    +    static final Set<Relationship> relationships;
    +
    +
    +    /*
    +     * Will ensure that list of PropertyDescriptors is build only once, since
    +     * all other lifecycle methods are invoked multiple times.
    +     */
    +    static {
    +        List<PropertyDescriptor> _descriptors = new ArrayList<>();
    +        _descriptors.addAll(sharedDescriptors);
    +        _descriptors.add(DELIVERY_GUARANTEE);
    +        _descriptors.add(KEY);
    +        _descriptors.add(MESSAGE_DEMARCATOR);
    +        _descriptors.add(META_WAIT_TIME);
    +        _descriptors.add(PARTITION_CLASS);
    +        _descriptors.add(COMPRESSION_CODEC);
    +
    +        descriptors = Collections.unmodifiableList(_descriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.addAll(sharedRelationships);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    /**
    +     * Will rendezvous with Kafka if {@link ProcessSession} contains {@link FlowFile}
    +     * producing a result {@link FlowFile}.
    +     * <br>
    +     * The result {@link FlowFile} that is successful is then transfered to {@link #REL_SUCCESS}
    +     * <br>
    +     * The result {@link FlowFile} that is failed is then transfered to {@link #REL_FAILURE}
    +     *
    +     */
    +    @Override
    +    protected void rendezvousWithKafka(ProcessContext context, ProcessSession session)
throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            flowFile = this.doRendezvousWithKafka(flowFile, context, session);
    +            if (!this.isFailedFlowFile(flowFile)) {
    +                session.getProvenanceReporter().send(flowFile, context.getProperty(BOOTSTRAP_SERVERS)
    --- End diff --
    
    i agree that the list pierre provided for provenance events is good to have.  'kafka'
is good to have as the scheme.  I do see how the multiple broker urls can be problematic but
I believe kafka does expose which broker we talked to.  If not then we can use the first one
but i'm pretty sure it is available.  This is important information for provenance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message