From issues-return-63806-archive-asf-public=cust-asf.ponee.io@nifi.apache.org Sat Sep 1 04:15:55 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D675A1807B4 for ; Sat, 1 Sep 2018 04:15:53 +0200 (CEST) Received: (qmail 4696 invoked by uid 500); 1 Sep 2018 02:15:53 -0000 Mailing-List: contact issues-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list issues@nifi.apache.org Received: (qmail 4564 invoked by uid 99); 1 Sep 2018 02:15:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 01 Sep 2018 02:15:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A1F3EE04A3; Sat, 1 Sep 2018 02:15:52 +0000 (UTC) From: MikeThomsen To: issues@nifi.apache.org Reply-To: issues@nifi.apache.org References: In-Reply-To: Subject: [GitHub] nifi pull request #2820: NIFI-5327 Adding Netflowv5 protocol parser Content-Type: text/plain Message-Id: <20180901021552.A1F3EE04A3@git1-us-west.apache.org> Date: Sat, 1 Sep 2018 02:15:52 +0000 (UTC) Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2820#discussion_r214501899 --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.network; + +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields; +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.OptionalInt; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processors.network.parser.Netflowv5Parser; +import org.apache.nifi.stream.io.StreamUtils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" }) +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.") +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") }) +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."), + @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") }) + +public class ParseNetflowv5 extends AbstractProcessor { + private String destination; + // Add mapper + private static final ObjectMapper mapper = new ObjectMapper(); + + public static final String DESTINATION_CONTENT = "flowfile-content"; + public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute"; + public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination") + .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES + + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.") + .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build(); + + public static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION)); + public static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS))); + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + destination = context.getProperty(FIELDS_DESTINATION).getValue(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final OptionalInt portNumber = resolvePort(flowFile); + final Netflowv5Parser parser = new Netflowv5Parser(portNumber); + + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer); + } + }); + + final int processedRecord; + try { + processedRecord = parser.parse(buffer); + getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord }); + } catch (Throwable e) { + getLogger().error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] { e, flowFile }); + session.transfer(flowFile, REL_FAILURE); + return; + } + + try { + final List multipleRecords = new ArrayList<>(); + switch (destination) { + case DESTINATION_ATTRIBUTES: + final Map attributes = new HashMap<>(); + generateKV(multipleRecords, session, flowFile, attributes, parser, processedRecord); + break; + case DESTINATION_CONTENT: + generateJSON(multipleRecords, session, flowFile, parser, processedRecord, buffer); + break; + } + // Create a provenance event recording the routing to success + multipleRecords.forEach(recordFlowFile -> session.getProvenanceReporter().route(recordFlowFile, REL_SUCCESS)); + session.getProvenanceReporter().route(flowFile, REL_ORIGINAL); + // Ready to transfer and commit + session.transfer(flowFile, REL_ORIGINAL); + session.transfer(multipleRecords, REL_SUCCESS); + session.adjustCounter("Records Processed", processedRecord, false); + session.commit(); + } catch (Exception e) { + // The flowfile has failed parsing & validation, routing to failure + getLogger().error("Failed to parse {} as a netflowv5 message due to {}; routing to failure", new Object[] { flowFile, e }); + // Create a provenance event recording the routing to failure + session.getProvenanceReporter().route(flowFile, REL_FAILURE); + session.transfer(flowFile, REL_FAILURE); + session.commit(); + return; + } finally { + session.rollback(); + } + } + + private void generateJSON(final List multipleRecords, final ProcessSession session, final FlowFile flowFile, final Netflowv5Parser parser, final int processedRecord, final byte[] buffer) + throws JsonProcessingException { + int numberOfRecords = processedRecord; + FlowFile recordFlowFile = flowFile; + int record = 0; + while (numberOfRecords-- > 0) { + ObjectNode results = mapper.createObjectNode(); + // Add Port number and message format + results.set("port", mapper.valueToTree(parser.getPortNumber())); + results.set("format", mapper.valueToTree("netflowv5")); + + recordFlowFile = session.clone(flowFile); --- End diff -- I think `session.create(FlowFile)` is the right one to use here because it will establish a parent-child relationship between the newly created record and the input flowfile. I think `clone` also clones the content so that could bog NiFi down a bit as well. ---