nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NIFI-5327) NetFlow Processors
Date Sat, 01 Sep 2018 02:16:02 GMT

    [ https://issues.apache.org/jira/browse/NIFI-5327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599489#comment-16599489
] 

ASF GitHub Bot commented on NIFI-5327:
--------------------------------------

Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r214501922
  
    --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java
---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes
or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally
read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description =
"The key and value generated by the parsing of the header fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and
value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed
fields destination")
    +            .description("Indicates whether the results of the parser are written " +
"to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT
+ ", the netflowv5 field will be converted into a flat JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a netflowv5 message
will be transferred to this Relationship without any attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The
original raw content").build();
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a netflowv5 data
will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new
HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session)
throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, buffer);
    +            }
    +        });
    +
    +        final int processedRecord;
    +        try {
    +            processedRecord = parser.parse(buffer);
    +            getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord
});
    +        } catch (Throwable e) {
    +            getLogger().error("Parser returned unexpected Exception {} while processing
{}; routing to failure", new Object[] { e, flowFile });
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        try {
    +            final List<FlowFile> multipleRecords = new ArrayList<>();
    +            switch (destination) {
    +            case DESTINATION_ATTRIBUTES:
    +                final Map<String, String> attributes = new HashMap<>();
    +                generateKV(multipleRecords, session, flowFile, attributes, parser, processedRecord);
    +                break;
    +            case DESTINATION_CONTENT:
    +                generateJSON(multipleRecords, session, flowFile, parser, processedRecord,
buffer);
    +                break;
    +            }
    +            // Create a provenance event recording the routing to success
    +            multipleRecords.forEach(recordFlowFile -> session.getProvenanceReporter().route(recordFlowFile,
REL_SUCCESS));
    +            session.getProvenanceReporter().route(flowFile, REL_ORIGINAL);
    +            // Ready to transfer and commit
    +            session.transfer(flowFile, REL_ORIGINAL);
    +            session.transfer(multipleRecords, REL_SUCCESS);
    +            session.adjustCounter("Records Processed", processedRecord, false);
    +            session.commit();
    +        } catch (Exception e) {
    +            // The flowfile has failed parsing & validation, routing to failure
    +            getLogger().error("Failed to parse {} as a netflowv5 message due to {}; routing
to failure", new Object[] { flowFile, e });
    +            // Create a provenance event recording the routing to failure
    +            session.getProvenanceReporter().route(flowFile, REL_FAILURE);
    +            session.transfer(flowFile, REL_FAILURE);
    +            session.commit();
    +            return;
    +        } finally {
    +            session.rollback();
    +        }
    +    }
    +
    +    private void generateJSON(final List<FlowFile> multipleRecords, final ProcessSession
session, final FlowFile flowFile, final Netflowv5Parser parser, final int processedRecord,
final byte[] buffer)
    +            throws JsonProcessingException {
    +        int numberOfRecords = processedRecord;
    +        FlowFile recordFlowFile = flowFile;
    +        int record = 0;
    +        while (numberOfRecords-- > 0) {
    +            ObjectNode results = mapper.createObjectNode();
    +            // Add Port number and message format
    +            results.set("port", mapper.valueToTree(parser.getPortNumber()));
    +            results.set("format", mapper.valueToTree("netflowv5"));
    +
    +            recordFlowFile = session.clone(flowFile);
    +            // Add JSON Objects
    +            generateJSONUtil(results, parser, record++);
    +
    +            recordFlowFile = session.write(recordFlowFile, new OutputStreamCallback()
{
    +                @Override
    +                public void process(OutputStream out) throws IOException {
    +                    try (OutputStream outputStream = new BufferedOutputStream(out)) {
    +                        outputStream.write(mapper.writeValueAsBytes(results));
    +                    }
    +                }
    +            });
    +            // Adjust the FlowFile mime.type attribute
    +            recordFlowFile = session.putAttribute(recordFlowFile, CoreAttributes.MIME_TYPE.key(),
"application/json");
    +            // Update the provenance for good measure
    +            session.getProvenanceReporter().modifyContent(recordFlowFile, "Replaced content
with parsed netflowv5 fields and values");
    --- End diff --
    
    If you go with `create` you can get rid of this. Something to keep in mind here is that
depending on the volume and velocity of the data, you could easily overpower the older provenance
repository. The write-ahead one could probably handle it, but you might cause problems for
people who didn't migrate (and I'm not sure if the write-ahead one is default now)


> NetFlow Processors
> ------------------
>
>                 Key: NIFI-5327
>                 URL: https://issues.apache.org/jira/browse/NIFI-5327
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework
>    Affects Versions: 1.6.0
>            Reporter: Prashanth Venkatesan
>            Assignee: Prashanth Venkatesan
>            Priority: Major
>
> As network traffic data scopes for the big data use case, would like NiFi to have processors
to support parsing of those protocols.
> Netflow is a protocol introduced by Cisco that provides the ability to collect IP network
traffic as it enters or exits an interface and is described in detail in here:
> [https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html]
>  
> Currently, I have created the following processor:
> *ParseNetflowv5*:  Parses the ingress netflowv5 bytes and ingest as either NiFi flowfile
attributes or as a JSON content. This also sends one-time-template.
>  
> Further ahead, we can add many processor specific to network protocols in this nar bundle.
> I will create a pull request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message