nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pvill...@apache.org
Subject nifi git commit: NIFI-1971 - Introduce QueryWhois processor with batching (i.e. netcat protocol) support
Date Sat, 01 Oct 2016 16:39:16 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 2054888fb -> 25b7dfa9b


NIFI-1971 - Introduce QueryWhois processor with batching (i.e. netcat protocol) support

This closes #858.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/25b7dfa9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/25b7dfa9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/25b7dfa9

Branch: refs/heads/master
Commit: 25b7dfa9b1ba8f2bda445a0ddfe55c198dffde38
Parents: 2054888
Author: Andre F de Miranda <trixpan@users.noreply.github.com>
Authored: Tue Aug 9 23:01:53 2016 +1000
Committer: Pierre Villard <pierre.villard.fr@gmail.com>
Committed: Sat Oct 1 18:38:58 2016 +0200

----------------------------------------------------------------------
 .../src/main/resources/META-INF/NOTICE          |  10 +
 .../nifi-enrich-processors/pom.xml              |  26 ++
 .../enrich/AbstractEnrichProcessor.java         | 100 +++++-
 .../apache/nifi/processors/enrich/QueryDNS.java |   3 +-
 .../nifi/processors/enrich/QueryWhois.java      | 347 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../nifi/processors/enrich/TestQueryDNS.java    |   1 +
 .../nifi/processors/enrich/TestQueryWhois.java  | 223 ++++++++++++
 8 files changed, 698 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/25b7dfa9/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-nar/src/main/resources/META-INF/NOTICE
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-nar/src/main/resources/META-INF/NOTICE
index 13c7902..30f4cfb 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-nar/src/main/resources/META-INF/NOTICE
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-nar/src/main/resources/META-INF/NOTICE
@@ -51,6 +51,16 @@ The following binary components are provided under the Apache Software
License v
       Apache Commons Logging
       Copyright 2003-2013 The Apache Software Foundation
 
+  (ASLv2) Apache Commons Net
+      The following NOTICE information applies:
+        Apache Commons Net
+        Copyright 2001-2016 The Apache Software Foundation
+
+  (ASLv2) Guava
+    The following NOTICE information applies:
+      Guava
+      Copyright 2015 The Guava Authors
+      
   (ASLv2) GeoIP2 Java API
     The following NOTICE information applies:
       GeoIP2 Java API

http://git-wip-us.apache.org/repos/asf/nifi/blob/25b7dfa9/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml
index 3def405..2268a48 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml
@@ -63,5 +63,31 @@
             <artifactId>nifi-mock</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>commons-net</groupId>
+            <artifactId>commons-net</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>1.6.5</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <version>1.6.5</version>
+            <scope>test</scope>
+            </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito-common</artifactId>
+            <version>1.6.5</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/25b7dfa9/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/AbstractEnrichProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/AbstractEnrichProcessor.java
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/AbstractEnrichProcessor.java
index c576ca2..0ba7934 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/AbstractEnrichProcessor.java
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/AbstractEnrichProcessor.java
@@ -18,6 +18,10 @@
 package org.apache.nifi.processors.enrich;
 
 
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -30,6 +34,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Scanner;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -62,10 +67,21 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor
{
     public static final PropertyDescriptor QUERY_PARSER_INPUT = new PropertyDescriptor.Builder()
             .name("QUERY_PARSER_INPUT")
             .displayName("Parser RegEx")
-            .description("Choice between a splitter and regex matcher used to parse the results
of the query into attribute groups")
+            .description("Choice between a splitter and regex matcher used to parse the results
of the query into attribute groups.\n" +
+            "NOTE: This is a multiline regular expression, therefore, the DFM should decide
how to handle trailing new line " +
+            "characters.")
             .expressionLanguageSupported(false)
             .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor KEY_GROUP = new PropertyDescriptor.Builder()
+            .name("KEY_GROUP")
+            .displayName("Key lookup group (multiline / batch)")
+            .description("When performing a batched lookup, the following RegEx numbered
capture group or Column number will be used to match " +
+                    "the whois server response with the lookup field")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
 
@@ -85,18 +101,20 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor
{
         final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
 
         final String chosenQUERY_PARSER = validationContext.getProperty(QUERY_PARSER).getValue();
-        final boolean QUERY_PARSER_INPUT_isSet = validationContext.getProperty(QUERY_PARSER_INPUT).isSet();
 
-        if ((!chosenQUERY_PARSER.equals(NONE.getValue()) ) && ( !QUERY_PARSER_INPUT_isSet
)) {
+        if (!chosenQUERY_PARSER.equals(NONE.getValue())  &&  !validationContext.getProperty(QUERY_PARSER_INPUT).isSet()
) {
             results.add(new ValidationResult.Builder().input("QUERY_PARSER_INPUT")
+                    .subject(QUERY_PARSER.getDisplayName())
                     .explanation("Split and Regex parsers require a valid Regular Expression")
                     .valid(false)
                     .build());
         }
 
-        if ((chosenQUERY_PARSER.equals(NONE.getValue()) ) && ( QUERY_PARSER_INPUT_isSet
)) {
-            results.add(new ValidationResult.Builder().input("QUERY_PARSER_INPUT")
-                    .explanation("NONE parser does not support the use of Regular Expressions")
+        if (chosenQUERY_PARSER.equals(NONE.getValue()) && validationContext.getProperty(QUERY_PARSER_INPUT).isSet())
{
+            results.add(new ValidationResult.Builder().input("QUERY_PARSER")
+                    .subject(QUERY_PARSER_INPUT.getDisplayName())
+                    .explanation("NONE parser does not support the use of Regular Expressions.
" +
+                            "Please select another parser or delete the regular expression
entered in this field.")
                     .valid(false)
                     .build());
         }
@@ -117,11 +135,11 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor
{
      * @param queryRegex The regex to be used to split the query results into groups
      * @return  Map with attribute names and values
      */
-    protected Map<String, String> parseResponse(int recordPosition, String rawResult,
String queryParser, String queryRegex, String schema) {
+    protected Map<String, String> parseResponse(String recordPosition, String rawResult,
String queryParser, String queryRegex, String schema) {
 
         Map<String, String> results = new HashMap<>();
         Pattern p;
-
+        recordPosition = StringUtils.isEmpty(recordPosition) ? "0" : recordPosition;
 
         // Iterates  over the results using the QUERY_REGEX adding the captured groups
         // as it progresses
@@ -131,7 +149,7 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor
{
                 // Time to Split the results...
                 String[] splitResult = rawResult.split(queryRegex);
                 for (int r = 0; r < splitResult.length; r++) {
-                    results.put("enrich." + schema + ".record" + String.valueOf(recordPosition)
+ ".group" + String.valueOf(r), splitResult[r]);
+                    results.put("enrich." + schema + ".record" + recordPosition + ".group"
+ String.valueOf(r), splitResult[r]);
                 }
                 break;
 
@@ -143,7 +161,7 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor
{
                     // Note that RegEx matches capture group 0 is usually broad but starting
with it anyway
                     // for the sake of purity
                     for (int r = 0; r < finalResult.groupCount(); r++) {
-                        results.put("enrich." + schema + ".record" + String.valueOf(recordPosition)
+ ".group" + String.valueOf(r), finalResult.group(r));
+                        results.put("enrich." + schema + ".record" + recordPosition + ".group"
+ String.valueOf(r), finalResult.group(r));
                     }
                 }
                 break;
@@ -152,10 +170,68 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor
{
                 // Fails to NONE
             default:
                 // NONE was chosen, just appending the record result as group0 without further
splitting
-                results.put("enrich." + schema + ".record" + String.valueOf(recordPosition)
+ ".group0", rawResult);
+                results.put("enrich." + schema + ".record" + recordPosition + ".group0",
rawResult);
                 break;
         }
         return results;
     }
 
+    /**
+     * This method returns the parsed record string in the form of
+     * a map of two strings, consisting of a iteration aware attribute
+     * names and its values
+     *
+
+     * @param  rawResult the raw query results to be parsed
+     * @param queryParser The parsing mechanism being used to parse the data into groups
+     * @param queryRegex The regex to be used to split the query results into groups. The
regex MUST implement at least on named capture group "KEY" to be used to populate the table
rows
+     * @param lookupKey The regular expression number or the column of a split to be used
for matching
+     * @return  Table with attribute names and values where each Table row uses the value
of the KEY named capture group specified in @param queryRegex
+     */
+    protected Table<String, String, String> parseBatchResponse(String rawResult, String
queryParser, String queryRegex, int lookupKey, String schema) {
+        // Note the hardcoded record0.
+        //  Since iteration is done within the parser and Multimap is used, the record number
here will always be 0.
+        // Consequentially, 0 is hardcoded so that batched and non batched attributes follow
the same naming
+        // conventions
+        final String recordPosition = ".record0";
+
+        final Table<String, String, String> results = HashBasedTable.create();
+
+        switch (queryParser) {
+            case "Split":
+                Scanner scanner = new Scanner(rawResult);
+                while (scanner.hasNextLine()) {
+                    String line = scanner.nextLine();
+                    // Time to Split the results...
+                    String[] splitResult = line.split(queryRegex);
+
+                    for (int r = 0; r < splitResult.length; r++) {
+                        results.put(splitResult[ lookupKey - 1 ], "enrich." + schema + recordPosition
+ ".group" + String.valueOf(r), splitResult[r]);
+                    }
+                }
+                break;
+            case "RegEx":
+            // prepare the regex
+            Pattern p;
+            // Regex is multiline. Each line should include a KEY for lookup
+            p = Pattern.compile(queryRegex, Pattern.MULTILINE);
+
+            Matcher matcher = p.matcher(rawResult);
+            while (matcher.find()) {
+                try {
+                    // Note that RegEx matches capture group 0 is usually broad but starting
with it anyway
+                    // for the sake of purity
+                    for (int r = 0; r <= matcher.groupCount(); r++) {
+                        results.put(matcher.group(lookupKey), "enrich." + schema + recordPosition
+ ".group" + String.valueOf(r), matcher.group(r));
+                    }
+                } catch (IndexOutOfBoundsException e) {
+                    getLogger().warn("Could not find capture group {} while processing result.
You may want to review your " +
+                            "Regular Expression to match against the content \"{}\"", new
Object[]{lookupKey, rawResult});
+                }
+            }
+            break;
+        }
+
+        return results;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/25b7dfa9/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryDNS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryDNS.java
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryDNS.java
index bb346c6..362bb2e 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryDNS.java
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryDNS.java
@@ -176,7 +176,8 @@ public class QueryDNS extends AbstractEnrichProcessor {
                     // While NXDOMAIN is being generated by doLookup catch
 
                     if (dnsRecord != "NXDOMAIN") {
-                        Map<String, String> parsedResults = parseResponse(recordNumber,
dnsRecord, queryParser, queryRegex, "dns");
+                        // Map<String, String> parsedResults = parseResponse(recordNumber,
dnsRecord, queryParser, queryRegex, "dns");
+                        Map<String, String> parsedResults = parseResponse(String.valueOf(recordNumber),
dnsRecord, queryParser, queryRegex, "dns");
                         flowFile = session.putAllAttributes(flowFile, parsedResults);
                         found = true;
                     } else {

http://git-wip-us.apache.org/repos/asf/nifi/blob/25b7dfa9/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryWhois.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryWhois.java
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryWhois.java
new file mode 100644
index 0000000..faf305b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryWhois.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.enrich;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.net.whois.WhoisClient;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"whois", "enrich", "ip"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("A powerful whois query processor primary designed to enrich DataFlows
with whois based APIs " +
+        "(e.g. ShadowServer's ASN lookup) but that can be also used to perform regular whois
lookups.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "enrich.dns.record*.group*", description = "The captured
fields of the Whois query response for each of the records received"),
+})
+public class QueryWhois extends AbstractEnrichProcessor {
+
+    public static final AllowableValue BEGIN_END = new AllowableValue("Begin/End", "Begin/End",
+            "The evaluated input of each flowfile is enclosed within begin and end tags.
Each row contains a delimited set of fields");
+
+    public static final AllowableValue BULK_NONE = new AllowableValue("None", "None",
+            "Queries are made without any particular dialect");
+
+
+    public static final PropertyDescriptor WHOIS_QUERY_TYPE = new PropertyDescriptor.Builder()
+            .name("WHOIS_QUERY_TYPE")
+            .displayName("Whois Query Type")
+            .description("The Whois query type to be used by the processor (if used)")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor WHOIS_SERVER = new PropertyDescriptor.Builder()
+            .name("WHOIS_SERVER")
+            .displayName("Whois Server")
+            .description("The Whois server to be used")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor WHOIS_SERVER_PORT = new PropertyDescriptor.Builder()
+            .name("WHOIS_SERVER_PORT")
+            .displayName("Whois Server Port")
+            .description("The TCP port of the remote Whois server")
+            .required(true)
+            .defaultValue("43")
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor WHOIS_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("WHOIS_TIMEOUT")
+            .displayName("Whois Query Timeout")
+            .description("The amount of time to wait until considering a query as failed")
+            .required(true)
+            .defaultValue("1500 ms")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("BATCH_SIZE")
+            .displayName("Batch Size")
+            .description("The number of incoming FlowFiles to process in a single execution
of this processor. ")
+            .required(true)
+            .defaultValue("25")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor BULK_PROTOCOL = new PropertyDescriptor.Builder()
+            .name("BULK_PROTOCOL")
+            .displayName("Bulk Protocol")
+            .description("The protocol used to perform the bulk query. ")
+            .required(true)
+            .defaultValue(BULK_NONE.getValue())
+            .allowableValues(BEGIN_END, BULK_NONE)
+            .build();
+
+    @Override
+    public List<ValidationResult> customValidate(ValidationContext validationContext)
{
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        final String chosenQUERY_PARSER = validationContext.getProperty(QUERY_PARSER).getValue();
+
+        if (!chosenQUERY_PARSER.equals(NONE.getValue())  &&  !validationContext.getProperty(QUERY_PARSER_INPUT).isSet()
) {
+            results.add(new ValidationResult.Builder().input("QUERY_PARSER_INPUT")
+                    .subject(QUERY_PARSER_INPUT.getDisplayName())
+                    .explanation("Split and Regex parsers require a valid Regular Expression")
+                    .valid(false)
+                    .build());
+        }
+
+
+        if (validationContext.getProperty(BATCH_SIZE).asInteger() > 1 &&   !validationContext.getProperty(KEY_GROUP).isSet()
)  {
+            results.add(new ValidationResult.Builder().input("KEY_GROUP")
+                    .subject(KEY_GROUP.getDisplayName())
+                    .explanation("when operating in Batching mode, RegEx and Split parsers
require a " +
+                            "valid capture group/matching column. Configure the processor
batch size to 1" +
+                            " or enter a valid column / named capture value.")
+                    .valid(false)
+                    .build());
+        }
+
+        if ( validationContext.getProperty(BATCH_SIZE).asInteger() > 1  && chosenQUERY_PARSER.equals(NONE.getValue())
 ) {
+            results.add(new ValidationResult.Builder().input(validationContext.getProperty(BATCH_SIZE).getValue())
+                    .subject(QUERY_PARSER.getDisplayName())
+                    .explanation("NONE parser does not support batching. Configure Batch
Size to 1 or use another parser.")
+                    .valid(false)
+                    .build());
+        }
+
+        if ( validationContext.getProperty(BATCH_SIZE).asInteger() == 1  && !validationContext.getProperty(BULK_PROTOCOL).getValue().equals(BULK_NONE.getValue())
) {
+            results.add(new ValidationResult.Builder().input("BULK_PROTOCOL")
+                    .subject(BATCH_SIZE.getDisplayName())
+                    .explanation("Bulk protocol requirement requires batching. Configure
Batch Size to more than 1 or " +
+                            "use another protocol.")
+                    .valid(false)
+                    .build());
+        }
+
+
+
+        return results;
+    }
+
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private final static Set<Relationship> relationships;
+
+    private WhoisClient whoisClient;
+
+    static {
+        List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(QUERY_INPUT);
+        props.add(WHOIS_QUERY_TYPE);
+        props.add(WHOIS_SERVER);
+        props.add(WHOIS_SERVER_PORT);
+        props.add(WHOIS_TIMEOUT);
+        props.add(BATCH_SIZE);
+        props.add(BULK_PROTOCOL);
+        props.add(QUERY_PARSER);
+        props.add(QUERY_PARSER_INPUT);
+        props.add(KEY_GROUP);
+        propertyDescriptors = Collections.unmodifiableList(props);
+
+        Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_FOUND);
+        rels.add(REL_NOT_FOUND);
+        relationships = Collections.unmodifiableSet(rels);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException
{
+
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+        List<FlowFile> flowFiles = session.get(batchSize);
+
+        if (flowFiles == null || flowFiles.isEmpty()) {
+            context.yield();
+            return;
+        }
+
+        // Build query
+        String buildString = "";
+        final String queryType = context.getProperty(WHOIS_QUERY_TYPE).getValue();
+
+        // Verify the the protocol mode and craft the "begin" pseudo-command, otherwise just
the query type
+        buildString = context.getProperty(BULK_PROTOCOL).getValue().equals(BEGIN_END.getValue())
 ? buildString.concat("begin ") : buildString.concat("");
+
+        // Append the query type
+        buildString = context.getProperty(WHOIS_QUERY_TYPE).isSet()  ? buildString.concat(queryType
+ " " ) : buildString.concat("");
+
+        // A new line is required when working on Begin/End
+        buildString = context.getProperty(BULK_PROTOCOL).getValue().equals(BEGIN_END.getValue())
? buildString.concat("\n") : buildString.concat("");
+
+        // append the values
+        for (FlowFile flowFile : flowFiles) {
+            final String evaluatedInput = context.getProperty(QUERY_INPUT).evaluateAttributeExpressions(flowFile).getValue();
+            buildString = buildString + evaluatedInput + "\n";
+        }
+
+        // Verify the the protocol mode and craft the "end" pseudo-command, otherwise just
the query type
+        buildString = context.getProperty(BULK_PROTOCOL).getValue().equals(BEGIN_END.getValue())
 ? buildString.concat("end") : buildString.concat("");
+
+
+        final String queryParser = context.getProperty(QUERY_PARSER).getValue();
+        final String queryRegex = context.getProperty(QUERY_PARSER_INPUT).getValue();
+        final int keyLookup = context.getProperty(KEY_GROUP).asInteger();
+        final int whoisTimeout = context.getProperty(WHOIS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        final String whoisServer = context.getProperty(WHOIS_SERVER).getValue();
+        final int whoisPort = context.getProperty(WHOIS_SERVER_PORT).asInteger();
+
+        final List<FlowFile> flowFilesMatched = new ArrayList<FlowFile>();
+        final List<FlowFile> flowFilesNotMatched = new ArrayList<FlowFile>();
+
+
+        String result = doLookup(whoisServer, whoisPort, whoisTimeout, buildString);
+        if (StringUtils.isEmpty(result)) {
+            // If nothing was returned, let the processor continue its life and transfer
the batch to REL_NOT_FOUND
+            session.transfer(flowFiles, REL_NOT_FOUND);
+            return;
+        } else {
+            // Run as normal
+            for (FlowFile flowFile : flowFiles) {
+                // Check the batchSize. If 1, run normal parser
+                if (batchSize == 1) {
+
+                    Map<String, String> parsedResults = parseResponse(null, result,
queryParser, queryRegex, "whois");
+
+                    if (parsedResults.isEmpty()) {
+                        // parsedResults didn't return anything valid, sending to not found.
+                        flowFilesNotMatched.add(flowFile);
+                    } else {
+                        // Still, extraction is needed
+                        flowFile = session.putAllAttributes(flowFile, parsedResults);
+                        flowFilesMatched.add(flowFile);
+
+                        // Finished processing single result
+                    }
+                } else {
+                    // Otherwise call the multiline parser and get the row map;
+                    final Map<String, Map<String, String>> rowMap = parseBatchResponse(result,
queryParser, queryRegex, keyLookup, "whois").rowMap();
+
+                    // Identify the flowfile Lookupvalue and search against the rowMap
+                    String ffLookupValue = context.getProperty(QUERY_INPUT).evaluateAttributeExpressions(flowFile).getValue();
+
+                    if (rowMap.containsKey(ffLookupValue)) {
+                        // flowfile Lookup Value is contained within the results, get the
properties and add to matched list
+                        flowFile = session.putAllAttributes(flowFile, rowMap.get(ffLookupValue));
+                        flowFilesMatched.add(flowFile);
+                    } else {
+                        // otherwise add to Not Matched
+                        flowFilesNotMatched.add(flowFile);
+                    }
+                }
+            }
+        }
+
+        // Finally prepare to send the data down the pipeline
+        // Because batches may include matches and non-matches, test both and send
+        // each to its relationship
+        if (flowFilesMatched.size() > 0) {
+            // Sending the resulting flowfile (with attributes) to REL_FOUND
+            session.transfer(flowFilesMatched, REL_FOUND);
+        }
+        if (flowFilesNotMatched.size() > 0) {
+            // Sending whatetver didn't match to REL_NOT_FOUND
+            session.transfer(flowFilesNotMatched, REL_NOT_FOUND);
+        }
+    }
+
+    /**
+     * This method performs a simple Whois lookup
+     * @param whoisServer Server to be queried;
+    *  @param whoisPort TCP port to be useed to connect to server
+     * @param whoisTimeout How long to wait for a response (in ms);
+     * @param query The query to be made;
+     */
+    protected String doLookup(String whoisServer, int whoisPort, int whoisTimeout, String
query) {
+        // This is a simple WHOIS lookup attempt
+
+        String result = null;
+
+        whoisClient = createClient();
+
+        try {
+            // Uses pre-existing context to resolve
+            if (!whoisClient.isConnected()) {
+                whoisClient.connect(whoisServer, whoisPort);
+                whoisClient.setSoTimeout(whoisTimeout);
+                result = whoisClient.query(query);
+                // clean up...
+                if (whoisClient.isConnected()) whoisClient.disconnect();
+            }
+        } catch ( IOException e) {
+            getLogger().error("Query failed due to {}", new Object[]{e.getMessage()}, e);
+            throw new ProcessException("Error performing Whois Lookup", e);
+        }
+        return result;
+    }
+
+
+    /*
+    Note createClient() was separated from the rest of code
+    in order to allow powermock to inject a fake return
+    during testing
+     */
+    protected WhoisClient createClient() {
+        return new WhoisClient();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/25b7dfa9/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index e4a39f5..5871892 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -15,3 +15,4 @@
 
 org.apache.nifi.processors.GeoEnrichIP
 org.apache.nifi.processors.enrich.QueryDNS
+org.apache.nifi.processors.enrich.QueryWhois
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/25b7dfa9/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryDNS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryDNS.java
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryDNS.java
index 593ff08..6e15f93 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryDNS.java
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryDNS.java
@@ -155,6 +155,7 @@ public class TestQueryDNS  {
 
     @Test
     public void testInvalidData()  {
+        queryDNSTestRunner.removeProperty(QueryDNS.QUERY_PARSER_INPUT);
         queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "AAAA");
         queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1");
         queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000 ms");

http://git-wip-us.apache.org/repos/asf/nifi/blob/25b7dfa9/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryWhois.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryWhois.java
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryWhois.java
new file mode 100644
index 0000000..4c0a1f9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryWhois.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.enrich;
+
+
+import org.apache.commons.net.whois.WhoisClient;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({WhoisClient.class})
+public class TestQueryWhois {
+    private QueryWhois queryWhois;
+    private TestRunner queryWhoisTestRunner;
+
+    @Before
+    public void setupTest() throws Exception {
+        // This is what is sent by Mockito
+        String header = "AS      | IP               | BGP Prefix          | CC | Registry
| Allocated  | Info                    | AS Name\n";
+        String responseBodyLine1 =  "999 | 123.123.123.123 | 123.123.123.123/32 | AU | apnic
| 2014-01-01 | 2016-08-14 01:32:01 GMT | Apache NiFi\n";
+        String responseBodyLine2 =  "333 | 124.124.124.124 | 124.124.124.124/32 | AU | apnic
| 2014-01-01 | 2016-08-14 01:32:01 GMT | Apache NiFi\n";
+
+        WhoisClient whoisClient = PowerMockito.mock(WhoisClient.class);
+        Mockito.when(whoisClient.query(Mockito.anyString())).thenReturn(header + responseBodyLine1
+ responseBodyLine2);
+
+        this.queryWhois =  new QueryWhois() {
+            @Override
+            protected WhoisClient createClient(){
+                return whoisClient;
+            }
+        };
+        this.queryWhoisTestRunner = TestRunners.newTestRunner(queryWhois);
+
+    }
+
+
+
+    @Test
+    public void testCustomValidator() {
+        queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_SERVER, "127.0.0.1");
+        queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_QUERY_TYPE, "peer");
+        queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_TIMEOUT, "1000 ms");
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_INPUT, "nifi.apache.org");
+
+        // Note the absence of a QUERY_PARSER_INPUT value
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
+        queryWhoisTestRunner.assertNotValid();
+
+
+        // Note the presence of a QUERY_PARSER_INPUT value
+        queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "1");
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\|");
+        queryWhoisTestRunner.assertValid();
+
+        // Note BULK_PROTOCOL and BATCH_SIZE
+        queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "1");
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\|");
+        queryWhoisTestRunner.setProperty(QueryWhois.BULK_PROTOCOL, QueryWhois.BEGIN_END.getValue());
+        queryWhoisTestRunner.assertNotValid();
+
+        // Note the presence of a QUERY_PARSER_INPUT value while NONE is set
+        queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "1");
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.NONE.getValue());
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\|");
+        queryWhoisTestRunner.assertNotValid();
+//
+        queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "10");
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.NONE.getValue());
+        queryWhoisTestRunner.removeProperty(QueryWhois.QUERY_PARSER_INPUT);
+        queryWhoisTestRunner.assertNotValid();
+
+        queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "10");
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\|");
+        queryWhoisTestRunner.removeProperty(QueryWhois.KEY_GROUP);
+        queryWhoisTestRunner.assertNotValid();
+
+        queryWhoisTestRunner.setProperty(QueryWhois.BATCH_SIZE, "10");
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\|");
+        queryWhoisTestRunner.removeProperty(QueryWhois.KEY_GROUP);
+        queryWhoisTestRunner.assertNotValid();
+
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.NONE.getValue());
+        queryWhoisTestRunner.assertNotValid();
+
+
+    }
+
+
+    @Test
+    public void testValidDataWithSplit()  {
+        queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_SERVER, "127.0.0.1");
+        queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_QUERY_TYPE, "origin");
+        queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_TIMEOUT, "1000 ms");
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_INPUT, "${ip_address:getDelimitedField(4,
'.'):trim()}" +
+                ".${ip_address:getDelimitedField(3, '.'):trim()}" +
+                ".${ip_address:getDelimitedField(2, '.'):trim()}" +
+                ".${ip_address:getDelimitedField(1, '.'):trim()}");
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.SPLIT.getValue());
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\\s+\\|\\s+");
+        queryWhoisTestRunner.setProperty(QueryWhois.KEY_GROUP, "2");
+
+        final Map<String, String> attributeMap1 = new HashMap<>();
+        final Map<String, String> attributeMap2 = new HashMap<>();
+        final Map<String, String> attributeMap3 = new HashMap<>();
+        attributeMap1.put("ip_address", "123.123.123.123");
+        attributeMap2.put("ip_address", "124.124.124.124");
+        attributeMap3.put("ip_address", "125.125.125.125");
+
+        queryWhoisTestRunner.enqueue(new byte[0], attributeMap1);
+        queryWhoisTestRunner.enqueue(new byte[0], attributeMap2);
+        queryWhoisTestRunner.enqueue(new byte[0], attributeMap3);
+        queryWhoisTestRunner.run();
+
+        List<MockFlowFile> matchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_FOUND);
+        assertTrue(matchingResults.size() == 2);
+        List<MockFlowFile> nonMatchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_NOT_FOUND);
+        assertTrue(nonMatchingResults.size() == 1);
+
+        matchingResults.get(0).assertAttributeEquals("enrich.whois.record0.group7", "Apache
NiFi");
+    }
+
+    @Test
+    public void testValidDataWithRegex()  {
+
+        queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_SERVER, "127.0.0.1");
+        queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_QUERY_TYPE, "origin");
+        queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_TIMEOUT, "1000 ms");
+        queryWhoisTestRunner.setProperty(QueryWhois.KEY_GROUP, "2");
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_INPUT, "${ip_address:getDelimitedField(4,
'.'):trim()}" +
+                ".${ip_address:getDelimitedField(3, '.'):trim()}" +
+                ".${ip_address:getDelimitedField(2, '.'):trim()}" +
+                ".${ip_address:getDelimitedField(1, '.'):trim()}");
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\n^([^\\|]*)\\|\\s+(\\S+)\\s+\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|(.*)$");
+
+        final Map<String, String> attributeMap1 = new HashMap<>();
+        final Map<String, String> attributeMap2 = new HashMap<>();
+        final Map<String, String> attributeMap3 = new HashMap<>();
+        attributeMap1.put("ip_address", "123.123.123.123");
+        attributeMap2.put("ip_address", "124.124.124.124");
+        attributeMap3.put("ip_address", "125.125.125.125");
+
+        queryWhoisTestRunner.enqueue(new byte[0], attributeMap1);
+        queryWhoisTestRunner.enqueue(new byte[0], attributeMap2);
+        queryWhoisTestRunner.enqueue(new byte[0], attributeMap3);
+        queryWhoisTestRunner.run();
+
+        List<MockFlowFile> matchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_FOUND);
+        assertTrue(matchingResults.size() == 2);
+        List<MockFlowFile> nonMatchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_NOT_FOUND);
+        assertTrue(nonMatchingResults.size() == 1);
+
+        matchingResults.get(0).assertAttributeEquals("enrich.whois.record0.group8", " Apache
NiFi");
+
+    }
+
+    @Test
+    public void testValidDataWithRegexButInvalidCaptureGroup()  {
+
+        queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_SERVER, "127.0.0.1");
+        queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_QUERY_TYPE, "origin");
+        queryWhoisTestRunner.setProperty(QueryWhois.WHOIS_TIMEOUT, "1000 ms");
+        queryWhoisTestRunner.setProperty(QueryWhois.KEY_GROUP, "9");
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_INPUT, "${ip_address:getDelimitedField(4,
'.'):trim()}" +
+                ".${ip_address:getDelimitedField(3, '.'):trim()}" +
+                ".${ip_address:getDelimitedField(2, '.'):trim()}" +
+                ".${ip_address:getDelimitedField(1, '.'):trim()}");
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER, QueryWhois.REGEX.getValue());
+        queryWhoisTestRunner.setProperty(QueryWhois.QUERY_PARSER_INPUT, "\n^([^\\|]*)\\|\\s+(\\S+)\\s+\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|([^\\|]*)\\|(.*)$");
+
+        final Map<String, String> attributeMap1 = new HashMap<>();
+        final Map<String, String> attributeMap2 = new HashMap<>();
+        final Map<String, String> attributeMap3 = new HashMap<>();
+        attributeMap1.put("ip_address", "123.123.123.123");
+        attributeMap2.put("ip_address", "124.124.124.124");
+        attributeMap3.put("ip_address", "125.125.125.125");
+
+        queryWhoisTestRunner.enqueue(new byte[0], attributeMap1);
+        queryWhoisTestRunner.enqueue(new byte[0], attributeMap2);
+        queryWhoisTestRunner.enqueue(new byte[0], attributeMap3);
+        queryWhoisTestRunner.run();
+
+        List<MockFlowFile> matchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_FOUND);
+        assertTrue(matchingResults.size() == 0);
+        List<MockFlowFile> nonMatchingResults = queryWhoisTestRunner.getFlowFilesForRelationship(QueryWhois.REL_NOT_FOUND);
+        assertTrue(nonMatchingResults.size() == 3);
+
+    }
+
+}
+


Mime
View raw message