nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mattyb...@apache.org
Subject [nifi] branch main updated: NIFI-8658: Allow Filter Functions and expressions to be specified as a RecordPaths
Date Fri, 04 Jun 2021 21:52:48 GMT
This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c9dee30  NIFI-8658: Allow Filter Functions and expressions to be specified as a RecordPaths
c9dee30 is described below

commit c9dee3029422a172c181d6c2d8d8e54e2ecd013e
Author: Mark Payne <markap14@hotmail.com>
AuthorDate: Thu Jun 3 17:26:01 2021 -0400

    NIFI-8658: Allow Filter Functions and expressions to be specified as a RecordPaths
    
    NIFI-8658: Addressed issue where the RecordField that was provided from Function Filters
were not accurate
    
    Signed-off-by: Matthew Burgess <mattyb149@apache.org>
    
    This closes #5125
---
 .../org/apache/nifi/record/path/RecordPathParser.g |  9 +--
 .../record/path/filter/BinaryOperatorFilter.java   | 31 +++++++++-
 .../nifi/record/path/filter/FunctionFilter.java    | 13 +++-
 .../apache/nifi/record/path/filter/NotFilter.java  | 12 +++-
 .../nifi/record/path/filter/RecordPathFilter.java  |  6 +-
 .../nifi/record/path/functions/FilterFunction.java | 53 ++++++++++++++++
 .../nifi/record/path/paths/RecordPathCompiler.java | 24 ++++++++
 .../apache/nifi/record/path/TestRecordPath.java    | 22 +++++++
 .../processors/standard/TestPartitionRecord.java   | 71 +++++++++++++++++++---
 .../nifi/processors/standard/TestUpdateRecord.java | 15 +++++
 10 files changed, 236 insertions(+), 20 deletions(-)

diff --git a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
index 682cd7e..19ea9e9 100644
--- a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
+++ b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
@@ -128,9 +128,9 @@ operator : LESS_THAN | LESS_THAN_EQUAL | GREATER_THAN | GREATER_THAN_EQUAL
| EQU
 
 literal : NUMBER | STRING_LITERAL;
 
-expression : path | literal | function;
+expression : path | literal | standaloneFunction;
 
-operation : expression operator^ expression;
+operation : expression (operator^ expression)?;
 
 filter : filterFunction | operation;
 
@@ -149,7 +149,7 @@ optionalArgument : argument?;
 argumentList : optionalArgument (COMMA argument)* ->
 	^(ARGUMENTS optionalArgument argument*);
 
-function : IDENTIFIER LPAREN argumentList RPAREN ->
+standaloneFunction : IDENTIFIER LPAREN argumentList RPAREN ->
 	^(FUNCTION IDENTIFIER argumentList);
 
 
@@ -176,6 +176,7 @@ notFilterFunction : NOT LPAREN notFunctionArgList RPAREN ->
 filterFunction : simpleFilterFunction | notFilterFunction;
 
 
+anyFunction : standaloneFunction | filterFunction;
 
 //
 // References
@@ -233,7 +234,7 @@ relativePath : currentOrParent relativePathSegment? ->
 
 path : absolutePath | relativePath;
 
-pathOrFunction : path | function;
+pathOrFunction : filter;
 
 pathExpression : pathOrFunction EOF ->
 	^(PATH_EXPRESSION pathOrFunction);
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
index 6e7ea96..791cd60 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
@@ -17,12 +17,15 @@
 
 package org.apache.nifi.record.path.filter;
 
-import java.util.Optional;
-import java.util.stream.Stream;
-
 import org.apache.nifi.record.path.FieldValue;
 import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
 import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.util.Optional;
+import java.util.stream.Stream;
 
 public abstract class BinaryOperatorFilter implements RecordPathFilter {
     private final RecordPathSegment lhs;
@@ -55,6 +58,28 @@ public abstract class BinaryOperatorFilter implements RecordPathFilter
{
     }
 
     @Override
+    public Stream<FieldValue> mapToBoolean(final RecordPathEvaluationContext context)
{
+        final Stream<FieldValue> rhsStream = rhs.evaluate(context);
+        final Optional<FieldValue> firstMatch = rhsStream
+            .filter(fieldVal -> fieldVal.getValue() != null)
+            .findFirst();
+
+        if (!firstMatch.isPresent()) {
+            return Stream.empty();
+        }
+
+        final FieldValue fieldValue = firstMatch.get();
+        final Object value = fieldValue.getValue();
+
+        final Stream<FieldValue> lhsStream = lhs.evaluate(context);
+        return lhsStream.map(fieldVal -> {
+            final boolean result = test(fieldVal, value);
+            final FieldValue mapped = new StandardFieldValue(result, new RecordField(getOperator(),
RecordFieldType.BOOLEAN.getDataType()), null);
+            return mapped;
+        });
+    }
+
+    @Override
     public String toString() {
         return lhs + " " + getOperator() + " " + rhs;
     }
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java
index c7e6114..caa27e7 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java
@@ -17,11 +17,14 @@
 
 package org.apache.nifi.record.path.filter;
 
-import java.util.stream.Stream;
-
 import org.apache.nifi.record.path.FieldValue;
 import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
 import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.util.stream.Stream;
 
 public abstract class FunctionFilter implements RecordPathFilter {
     private final RecordPathSegment recordPath;
@@ -36,5 +39,11 @@ public abstract class FunctionFilter implements RecordPathFilter {
             .filter(fv -> invert ? !test(fv, context) : test(fv, context));
     }
 
+    @Override
+    public Stream<FieldValue> mapToBoolean(final RecordPathEvaluationContext context)
{
+        return recordPath.evaluate(context)
+            .map(fv -> new StandardFieldValue(test(fv, context), new RecordField("<function>",
RecordFieldType.BOOLEAN.getDataType()), null));
+    }
+
     protected abstract boolean test(FieldValue fieldValue, final RecordPathEvaluationContext
context);
 }
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java
index bbe38ed..c07f036 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java
@@ -17,10 +17,13 @@
 
 package org.apache.nifi.record.path.filter;
 
-import java.util.stream.Stream;
-
 import org.apache.nifi.record.path.FieldValue;
 import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.util.stream.Stream;
 
 public class NotFilter implements RecordPathFilter {
     private final RecordPathFilter filter;
@@ -34,5 +37,10 @@ public class NotFilter implements RecordPathFilter {
         return filter.filter(context, !invert);
     }
 
+    @Override
+    public Stream<FieldValue> mapToBoolean(final RecordPathEvaluationContext context)
{
+        return filter.mapToBoolean(context)
+            .map(fieldValue -> new StandardFieldValue(!Boolean.TRUE.equals(fieldValue.getValue()),
new RecordField("not()", RecordFieldType.BOOLEAN.getDataType()), null));
+    }
 
 }
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
index 389f6d3..409081a 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
@@ -17,13 +17,15 @@
 
 package org.apache.nifi.record.path.filter;
 
-import java.util.stream.Stream;
-
 import org.apache.nifi.record.path.FieldValue;
 import org.apache.nifi.record.path.RecordPathEvaluationContext;
 
+import java.util.stream.Stream;
+
 public interface RecordPathFilter {
 
     Stream<FieldValue> filter(RecordPathEvaluationContext context, boolean invert);
 
+    Stream<FieldValue> mapToBoolean(RecordPathEvaluationContext context);
+
 }
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/FilterFunction.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/FilterFunction.java
new file mode 100644
index 0000000..60f5860
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/FilterFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.filter.RecordPathFilter;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+import java.util.stream.Stream;
+
+/**
+ * <p>
+ * A Filter Function is responsible for taking a RecordPathFilter and turning it into a function
that is capable of
+ * being evaluated against a RecordPathEvaluationContext such that the return value is a
Stream&lt;FieldValue&gt; whose
+ * values are booleans.
+ * </p>
+ *
+ * <p>
+ * So while a RecordPathFilter would be evaluated against a RecordPathEvaluationContext and
return a Stream of FieldValues representing
+ * all elements that match the filter, the FilterFunction would instead return a true/false
for each element indicating whether or not
+ * it passes the filter.
+ * </p>
+ */
+public class FilterFunction extends RecordPathSegment {
+
+    private final RecordPathFilter filter;
+
+    public FilterFunction(final String functionName, final RecordPathFilter filter, final
boolean absolute) {
+        super(functionName, null, absolute);
+        this.filter = filter;
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+        return filter.mapToBoolean(context);
+    }
+}
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
index 7ef2255..d966382 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
@@ -41,6 +41,7 @@ import org.apache.nifi.record.path.functions.Coalesce;
 import org.apache.nifi.record.path.functions.Concat;
 import org.apache.nifi.record.path.functions.EscapeJson;
 import org.apache.nifi.record.path.functions.FieldName;
+import org.apache.nifi.record.path.functions.FilterFunction;
 import org.apache.nifi.record.path.functions.Format;
 import org.apache.nifi.record.path.functions.Hash;
 import org.apache.nifi.record.path.functions.PadLeft;
@@ -104,6 +105,18 @@ public class RecordPathCompiler {
             parent = RecordPathCompiler.buildPath(child, parent, absolute);
         }
 
+        // If the given path tree is an operator, create a Filter Function that will be responsible
for returning true/false based on the provided operation
+        switch (pathTree.getType()) {
+            case EQUAL:
+            case NOT_EQUAL:
+            case LESS_THAN:
+            case LESS_THAN_EQUAL:
+            case GREATER_THAN:
+            case GREATER_THAN_EQUAL:
+                final RecordPathFilter filter = createFilter(pathTree, null, absolute);
+                return new FilterFunction(pathTree.getText(), filter, absolute);
+        }
+
         return parent;
     }
 
@@ -364,6 +377,17 @@ public class RecordPathCompiler {
 
                         return new Coalesce(argPaths, absolute);
                     }
+                    case "not":
+                    case "contains":
+                    case "containsRegex":
+                    case "endsWith":
+                    case "startsWith":
+                    case "isBlank":
+                    case "isEmpty":
+                    case "matchesRegex": {
+                        final RecordPathFilter filter = createFilter(tree, null, absolute);
+                        return new FilterFunction(functionName, filter, absolute);
+                    }
                     default: {
                         throw new RecordPathException("Invalid function call: The '" + functionName
+ "' function does not exist or can only "
                             + "be used within a predicate, not as a standalone function");
diff --git a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
index 5a6057f..0c89b2e 100644
--- a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
+++ b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
@@ -1880,6 +1880,28 @@ public class TestRecordPath {
         assertEquals(Uuid5Util.fromString(input, null), value);
     }
 
+    @Test
+    public void testPredicateAsPath() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", null);
+        final Record record = new MapRecord(schema, values);
+
+        assertEquals(Boolean.TRUE, RecordPath.compile("isEmpty( /name )").evaluate(record).getSelectedFields().findFirst().get().getValue());
+        assertEquals(Boolean.FALSE, RecordPath.compile("isEmpty( /id )").evaluate(record).getSelectedFields().findFirst().get().getValue());
+
+        assertEquals(Boolean.TRUE, RecordPath.compile("/id = 48").evaluate(record).getSelectedFields().findFirst().get().getValue());
+        assertEquals(Boolean.FALSE, RecordPath.compile("/id > 48").evaluate(record).getSelectedFields().findFirst().get().getValue());
+
+        assertEquals(Boolean.FALSE, RecordPath.compile("not(/id = 48)").evaluate(record).getSelectedFields().findFirst().get().getValue());
+    }
+
     private List<RecordField> getDefaultFields() {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java
index 32546c5..933ad47 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java
@@ -17,13 +17,6 @@
 
 package org.apache.nifi.processors.standard;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.stream.IntStream;
-
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.record.MockRecordParser;
 import org.apache.nifi.serialization.record.MockRecordWriter;
@@ -34,6 +27,13 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class TestPartitionRecord {
 
     private TestRunner runner;
@@ -93,6 +93,63 @@ public class TestPartitionRecord {
     }
 
     @Test
+    public void groupByIsEmpty() {
+        runner.setProperty("unknown-age", "isEmpty( /age )");
+        runner.setProperty("another-unknown", "isEmpty( /nonExistentField )");
+
+        readerService.addRecord("John", 28, null);
+        readerService.addRecord("Jake", 49, null);
+        readerService.addRecord("Mark", null, null);
+        readerService.addRecord("Jane", 20, null);
+        readerService.addRecord("Jake", 14, null);
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(PartitionRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(PartitionRecord.REL_FAILURE, 0);
+        runner.assertTransferCount(PartitionRecord.REL_SUCCESS, 2);
+
+        final List<MockFlowFile> out = runner.getFlowFilesForRelationship(PartitionRecord.REL_SUCCESS);
+
+        assertEquals(1L, out.stream().filter(ff -> ff.getAttribute("record.count").equals("1")).count());
+        assertEquals(1L, out.stream().filter(ff -> ff.getAttribute("record.count").equals("4")).count());
+
+        assertEquals(1L, out.stream().filter(ff -> ff.getAttribute("unknown-age").equals("true")).count());
+        assertEquals(1L, out.stream().filter(ff -> ff.getAttribute("unknown-age").equals("false")).count());
+
+        out.forEach(ff -> ff.assertAttributeEquals("another-unknown", "true"));
+    }
+
+    @Test
+    public void testExpressionAsPath() {
+        runner.setProperty("adult", "/age >= 18");
+
+        readerService.addRecord("John", 28, null);
+        readerService.addRecord("Jake", 49, null);
+        readerService.addRecord("Mark", null, null);
+        readerService.addRecord("Jane", 20, null);
+        readerService.addRecord("Jake", 14, null);
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(PartitionRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(PartitionRecord.REL_FAILURE, 0);
+        runner.assertTransferCount(PartitionRecord.REL_SUCCESS, 2);
+
+        final List<MockFlowFile> out = runner.getFlowFilesForRelationship(PartitionRecord.REL_SUCCESS);
+
+        assertEquals(1L, out.stream().filter(ff -> ff.getAttribute("record.count").equals("2")).count());
+        assertEquals(1L, out.stream().filter(ff -> ff.getAttribute("record.count").equals("3")).count());
+
+        assertEquals(1L, out.stream().filter(ff -> ff.getAttribute("adult").equals("true")).count());
+        assertEquals(1L, out.stream().filter(ff -> ff.getAttribute("adult").equals("false")).count());
+    }
+
+    @Test
     public void testGroupByIntAllRecordsTogether() {
         runner.setProperty("age", "/age");
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
index c519869..068047d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
@@ -115,6 +115,21 @@ public class TestUpdateRecord {
     }
 
     @Test
+    public void testRecordPathReplacementWithFilterFunctionCall() {
+        runner.setProperty("/hasAge", "not(isEmpty(/age))");
+        runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES.getValue());
+        runner.enqueue("");
+
+        readerService.addRecord("John Doe", 35);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0);
+        out.assertContentEquals("header\nJohn Doe,35,true\n");
+
+    }
+
+    @Test
     public void testInvalidRecordPathUsingExpressionLanguage() {
         runner.setProperty("/name", "${recordPath}");
         runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES.getValue());

Mime
View raw message