nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jw...@apache.org
Subject nifi git commit: NIFI-4116: Allow fields of Record returned from Lookup Service to be placed into record in the input, instead of requiring that the 'wrapper record' returned from Lookup be included
Date Wed, 06 Sep 2017 04:32:16 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 20a6374bf -> bfd6c0aef


NIFI-4116: Allow fields of Record returned from Lookup Service to be placed into record in
the input, instead of requiring that the 'wrapper record' returned from Lookup be included

Signed-off-by: James Wing <jvwing@gmail.com>

This closes #2110.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bfd6c0ae
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bfd6c0ae
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bfd6c0ae

Branch: refs/heads/master
Commit: bfd6c0aef768f53be8fdea62afdcd5404099e089
Parents: 20a6374
Author: Mark Payne <markap14@hotmail.com>
Authored: Fri Aug 25 16:22:43 2017 -0400
Committer: James Wing <jvwing@gmail.com>
Committed: Tue Sep 5 21:29:16 2017 -0700

----------------------------------------------------------------------
 .../nifi/processors/standard/LookupRecord.java  |  56 +++++-
 .../processors/standard/TestLookupRecord.java   | 173 +++++++++++++++++++
 2 files changed, 224 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bfd6c0ae/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
index 10539bc..286f7ee 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -95,6 +95,11 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String,
RecordPa
         "Records will be routed to either a 'matched' or an 'unmatched' Relationship depending
on whether or not there was a match in the configured Lookup Service. "
             + "A single input FlowFile may result in two different output FlowFiles.");
 
+    static final AllowableValue RESULT_ENTIRE_RECORD = new AllowableValue("insert-entire-record",
"Insert Entire Record",
+        "The entire Record that is retrieved from the Lookup Service will be inserted into
the destination path.");
+    static final AllowableValue RESULT_RECORD_FIELDS = new AllowableValue("record-fields",
"Insert Record Fields",
+        "All of the fields in the Record that is retrieved from the Lookup Service will be
inserted into the destination path.");
+
     static final PropertyDescriptor LOOKUP_SERVICE = new PropertyDescriptor.Builder()
         .name("lookup-service")
         .displayName("Lookup Service")
@@ -114,6 +119,16 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String,
RecordPa
         .required(false)
         .build();
 
+    static final PropertyDescriptor RESULT_CONTENTS = new PropertyDescriptor.Builder()
+        .name("result-contents")
+        .displayName("Record Result Contents")
+        .description("When a result is obtained that contains a Record, this property determines
whether the Record itself is inserted at the configured "
+            + "path or if the contents of the Record (i.e., the sub-fields) will be inserted
at the configured path.")
+        .allowableValues(RESULT_ENTIRE_RECORD, RESULT_RECORD_FIELDS)
+        .defaultValue(RESULT_ENTIRE_RECORD.getValue())
+        .required(true)
+        .build();
+
     static final PropertyDescriptor ROUTING_STRATEGY = new PropertyDescriptor.Builder()
         .name("routing-strategy")
         .displayName("Routing Strategy")
@@ -161,6 +176,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String,
RecordPa
         properties.add(LOOKUP_SERVICE);
         properties.add(RESULT_RECORD_PATH);
         properties.add(ROUTING_STRATEGY);
+        properties.add(RESULT_CONTENTS);
         return properties;
     }
 
@@ -272,14 +288,14 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String,
RecordPa
             lookupCoordinates.put(coordinateKey, coordinateValue);
         }
 
-        final Optional<?> lookupValue;
+        final Optional<?> lookupValueOption;
         try {
-            lookupValue = lookupService.lookup(lookupCoordinates);
+            lookupValueOption = lookupService.lookup(lookupCoordinates);
         } catch (final Exception e) {
             throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates
+ " in Lookup Service", e);
         }
 
-        if (!lookupValue.isPresent()) {
+        if (!lookupValueOption.isPresent()) {
             final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION
: SUCCESS_COLLECTION;
             return rels;
         }
@@ -289,9 +305,39 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String,
RecordPa
         if (resultPath != null) {
             record.incorporateSchema(writeSchema);
 
-            final Object replacementValue = lookupValue.get();
+            final Object lookupValue = lookupValueOption.get();
             final RecordPathResult resultPathResult = flowFileContext.getValue().evaluate(record);
-            resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(replacementValue));
+
+            final String resultContentsValue = context.getProperty(RESULT_CONTENTS).getValue();
+            if (RESULT_RECORD_FIELDS.getValue().equals(resultContentsValue) && lookupValue
instanceof Record) {
+                final Record lookupRecord = (Record) lookupValue;
+
+                // Use wants to add all fields of the resultant Record to the specified Record
Path.
+                // If the destination Record Path returns to us a Record, then we will add
all field values of
+                // the Lookup Record to the destination Record. However, if the destination
Record Path returns
+                // something other than a Record, then we can't add the fields to it. We
can only replace it,
+                // because it doesn't make sense to add fields to anything but a Record.
+                resultPathResult.getSelectedFields().forEach(fieldVal -> {
+                    final Object destinationValue = fieldVal.getValue();
+
+                    if (destinationValue instanceof Record) {
+                        final Record destinationRecord = (Record) destinationValue;
+
+                        for (final String fieldName : lookupRecord.getRawFieldNames()) {
+                            final Object value = lookupRecord.getValue(fieldName);
+                            destinationRecord.setValue(fieldName, value);
+                        }
+                    } else {
+                        final Optional<Record> parentOption = fieldVal.getParentRecord();
+
+                        if (parentOption.isPresent()) {
+                            parentOption.get().setValue(fieldVal.getField().getFieldName(),
lookupRecord);
+                        }
+                    }
+                });
+            } else {
+                resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue));
+            }
         }
 
         final Set<Relationship> rels = routeToMatchedUnmatched ? MATCHED_COLLECTION
: SUCCESS_COLLECTION;

http://git-wip-us.apache.org/repos/asf/nifi/blob/bfd6c0ae/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
index b84f518..29966e7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
@@ -17,19 +17,30 @@
 
 package org.apache.nifi.processors.standard;
 
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.lookup.RecordLookupService;
 import org.apache.nifi.lookup.StringLookupService;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.MockRecordParser;
 import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -227,6 +238,137 @@ public class TestLookupRecord {
     }
 
 
+    @Test
+    public void testAddFieldsToExistingRecord() throws InitializationException, IOException
{
+        final RecordLookup lookupService = new RecordLookup();
+        runner.addControllerService("lookup", lookupService);
+        runner.enableControllerService(lookupService);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final Record sports = new MapRecord(schema, new HashMap<String, Object>());
+
+        sports.setValue("favorite", "basketball");
+        sports.setValue("least", "soccer");
+
+        lookupService.addValue("John Doe", sports);
+
+        recordReader = new MockRecordParser();
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("favorite", RecordFieldType.STRING);
+        recordReader.addSchemaField("least", RecordFieldType.STRING);
+
+        recordReader.addRecord("John Doe", 48, null, "baseball");
+
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+
+        runner.setProperty("lookup", "/name");
+        runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/");
+        runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
+
+        runner.enqueue("");
+        runner.run();
+
+        final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
+        out.assertContentEquals("John Doe,48,basketball,soccer\n");
+    }
+
+    /**
+     * If the output fields are added to a record that doesn't exist, the result should be
that a Record is
+     * created and the results added to it.
+     */
+    @Test
+    public void testAddFieldsToNonExistentRecord() throws InitializationException {
+        final RecordLookup lookupService = new RecordLookup();
+        runner.addControllerService("lookup", lookupService);
+        runner.enableControllerService(lookupService);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final Record sports = new MapRecord(schema, new HashMap<String, Object>());
+
+        sports.setValue("favorite", "basketball");
+        sports.setValue("least", "soccer");
+
+        lookupService.addValue("John Doe", sports);
+
+        recordReader = new MockRecordParser();
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("sport", RecordFieldType.RECORD);
+
+        recordReader.addRecord("John Doe", 48, null);
+
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+
+        runner.setProperty("lookup", "/name");
+        runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport");
+        runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
+
+        runner.enqueue("");
+        runner.run();
+
+        final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
+
+        // We can't be sure of the order of the fields in the record, so we allow either
'least' or 'favorite' to be first
+        final String outputContents = new String(out.toByteArray());
+        assertTrue(outputContents.equals("John Doe,48,MapRecord[{favorite=basketball, least=soccer}]\n")
+            || outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n"));
+    }
+
+    /**
+     * If the output fields are added to a non-record field, then the result should be that
the field
+     * becomes a UNION that does allow the Record and the value is set to a Record.
+     */
+    @Test
+    public void testAddFieldsToNonRecordField() throws InitializationException {
+        final RecordLookup lookupService = new RecordLookup();
+        runner.addControllerService("lookup", lookupService);
+        runner.enableControllerService(lookupService);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final Record sports = new MapRecord(schema, new HashMap<String, Object>());
+
+        sports.setValue("favorite", "basketball");
+        sports.setValue("least", "soccer");
+
+        lookupService.addValue("John Doe", sports);
+
+        recordReader = new MockRecordParser();
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+        recordReader.addRecord("John Doe", 48, null);
+
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+
+        runner.setProperty("lookup", "/name");
+        runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport");
+        runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
+
+        runner.enqueue("");
+        runner.run();
+
+        final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
+
+        // We can't be sure of the order of the fields in the record, so we allow either
'least' or 'favorite' to be first
+        final String outputContents = new String(out.toByteArray());
+        assertTrue(outputContents.equals("John Doe,48,MapRecord[{favorite=basketball, least=soccer}]\n")
+            || outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n"));
+    }
+
 
     private static class MapLookup extends AbstractControllerService implements StringLookupService
{
         private final Map<String, String> values = new HashMap<>();
@@ -260,4 +402,35 @@ public class TestLookupRecord {
         }
     }
 
+    private static class RecordLookup extends AbstractControllerService implements RecordLookupService
{
+        private final Map<String, Record> values = new HashMap<>();
+
+        public void addValue(final String key, final Record value) {
+            values.put(key, value);
+        }
+
+        @Override
+        public Class<?> getValueType() {
+            return String.class;
+        }
+
+        @Override
+        public Optional<Record> lookup(final Map<String, String> coordinates)
{
+            if (coordinates == null) {
+                return Optional.empty();
+            }
+
+            final String key = coordinates.get("lookup");
+            if (key == null) {
+                return Optional.empty();
+            }
+
+            return Optional.ofNullable(values.get(key));
+        }
+
+        @Override
+        public Set<String> getRequiredKeys() {
+            return Collections.singleton("lookup");
+        }
+    }
 }


Mime
View raw message