nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mattyb...@apache.org
Subject nifi git commit: NIFI-4649 Added FlattenJson processor.
Date Fri, 05 Jan 2018 23:15:40 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 89fb1b37d -> d9866c75e


NIFI-4649 Added FlattenJson processor.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2307

Replaced star imports, removed unused import

Added explanation for invalid Expression


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d9866c75
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d9866c75
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d9866c75

Branch: refs/heads/master
Commit: d9866c75e2475986ed799c9fb75c622fc38f5a4f
Parents: 89fb1b3
Author: Mike Thomsen <mikerthomsen@gmail.com>
Authored: Thu Nov 30 07:23:53 2017 -0500
Committer: Matthew Burgess <mattyb149@apache.org>
Committed: Fri Jan 5 18:14:30 2018 -0500

----------------------------------------------------------------------
 .../nifi-standard-processors/pom.xml            |   5 +
 .../nifi/processors/standard/FlattenJson.java   | 145 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../processors/standard/TestFlattenJson.groovy  | 137 ++++++++++++++++++
 4 files changed, 288 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d9866c75/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index a96631a..2557d1a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -303,6 +303,11 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>com.github.wnameless</groupId>
+            <artifactId>json-flattener</artifactId>
+            <version>0.4.1</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.bval</groupId>
             <artifactId>bval-jsr</artifactId>
             <version>1.1.2</version>

http://git-wip-us.apache.org/repos/asf/nifi/blob/d9866c75/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java
new file mode 100644
index 0000000..0406157
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.wnameless.json.flattener.FlattenMode;
+import com.github.wnameless.json.flattener.JsonFlattener;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageCompiler;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Tags({ "json", "flatten" })
+@CapabilityDescription(
+        "Provides the user with the ability to take a nested JSON document and flatten it
into a simple key/value pair " +
+        "document. The keys are combined at each level with a user-defined separator that
defaults to '.'"
+)
+@SideEffectFree
+public class FlattenJson extends AbstractProcessor {
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .description("Successfully flattened files go to this relationship.")
+            .name("success")
+            .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .description("Files that cannot be flattened go to this relationship.")
+            .name("failure")
+            .build();
+
+    static final PropertyDescriptor SEPARATOR = new PropertyDescriptor.Builder()
+            .name("flatten-json-separator")
+            .displayName("Separator")
+            .defaultValue(".")
+            .description("The separator character used for joining keys. Must be a JSON-legal
character.")
+            .addValidator((subject, input, context) -> {
+                if (context.isExpressionLanguagePresent(input)) {
+                    ExpressionLanguageCompiler elc = context.newExpressionLanguageCompiler();
+                    final boolean validExpression = elc.isValidExpression(input);
+                    return new ValidationResult.Builder().subject(subject).input(input)
+                            .valid(validExpression).explanation(validExpression ? "": "Not
a valid Expression").build();
+                }
+
+                boolean valid = input != null && input.length() == 1;
+                String message = !valid ? "The separator must be a single character in length."
: "";
+
+                ObjectMapper mapper = new ObjectMapper();
+                String test = String.format("{ \"prop%sprop\": \"test\" }", input);
+                try {
+                    mapper.readValue(test, Map.class);
+                } catch (IOException e) {
+                    message = e.getLocalizedMessage();
+                    valid = false;
+                }
+
+                return new ValidationResult.Builder().subject(subject).input(input).valid(valid).explanation(message).build();
+            })
+            .expressionLanguageSupported(true)
+            .build();
+
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(SEPARATOR);
+        properties = Collections.unmodifiableList(props);
+
+        Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+
+        relationships = Collections.unmodifiableSet(rels);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws
ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        String separator = context.getProperty(SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
+
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            session.exportTo(flowFile, bos);
+            bos.close();
+
+            String raw = new String(bos.toByteArray());
+            final String flattened = new JsonFlattener(raw)
+                    .withFlattenMode(FlattenMode.KEEP_ARRAYS)
+                    .withSeparator(separator.charAt(0))
+                    .flatten();
+
+            flowFile = session.write(flowFile, os -> os.write(flattened.getBytes()));
+
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception ex) {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d9866c75/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index c95f964..3fb0de3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -33,6 +33,7 @@ org.apache.nifi.processors.standard.ExecuteProcess
 org.apache.nifi.processors.standard.ExtractText
 org.apache.nifi.processors.standard.FetchSFTP
 org.apache.nifi.processors.standard.FetchFile
+org.apache.nifi.processors.standard.FlattenJson
 org.apache.nifi.processors.standard.GenerateFlowFile
 org.apache.nifi.processors.standard.GetFile
 org.apache.nifi.processors.standard.GetFTP

http://git-wip-us.apache.org/repos/asf/nifi/blob/d9866c75/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy
new file mode 100644
index 0000000..b319258
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard
+
+import groovy.json.JsonSlurper
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Test
+import static groovy.json.JsonOutput.prettyPrint
+import static groovy.json.JsonOutput.toJson
+
+class TestFlattenJson {
+    @Test
+    void testFlatten() {
+        def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+        def json = prettyPrint(toJson([
+            test: [
+                msg: "Hello, world"
+            ],
+            first: [
+                second: [
+                    third: [
+                        "one", "two", "three", "four", "five"
+                    ]
+                ]
+            ]
+        ]))
+        baseTest(testRunner, json, 2) { parsed ->
+            Assert.assertEquals("test.msg should exist, but doesn't", parsed["test.msg"],
"Hello, world")
+            Assert.assertEquals("Three level block doesn't exist.", parsed["first.second.third"],
[
+                    "one", "two", "three", "four", "five"
+            ])
+        }
+    }
+
+    void baseTest(testRunner, String json, int keyCount, Closure c) {
+        baseTest(testRunner, json, [:], keyCount, c);
+    }
+
+    void baseTest(def testRunner, String json, Map attrs, int keyCount, Closure c) {
+        testRunner.enqueue(json, attrs)
+        testRunner.run(1, true)
+        testRunner.assertTransferCount(FlattenJson.REL_FAILURE, 0)
+        testRunner.assertTransferCount(FlattenJson.REL_SUCCESS, 1)
+
+        def flowFiles = testRunner.getFlowFilesForRelationship(FlattenJson.REL_SUCCESS)
+        def content   = testRunner.getContentAsByteArray(flowFiles[0])
+        def asJson    = new String(content)
+        def slurper   = new JsonSlurper()
+        def parsed    = slurper.parseText(asJson) as Map
+
+        Assert.assertEquals("Too many keys", keyCount, parsed.size())
+        c.call(parsed)
+    }
+
+    @Test
+    void testFlattenRecordSet() {
+        def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+        def json = prettyPrint(toJson([
+            [
+                first: [
+                    second: "Hello"
+                ]
+            ],
+            [
+                first: [
+                    second: "World"
+                ]
+            ]
+        ]))
+
+        def expected = ["Hello", "World"]
+        baseTest(testRunner, json, 2) { parsed ->
+            Assert.assertTrue("Not a list", parsed instanceof List)
+            0.upto(parsed.size() - 1) {
+                Assert.assertEquals("Missing values.", parsed[it]["first.second"], expected[it])
+            }
+        }
+    }
+
+    @Test
+    void testDifferentSeparator() {
+        def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+        def json = prettyPrint(toJson([
+            first: [
+                second: [
+                    third: [
+                        "one", "two", "three", "four", "five"
+                    ]
+                ]
+            ]
+        ]))
+        testRunner.setProperty(FlattenJson.SEPARATOR, "_")
+        baseTest(testRunner, json, 1) { parsed ->
+            Assert.assertEquals("Separator not applied.", parsed["first_second_third"], [
+                "one", "two", "three", "four", "five"
+            ])
+        }
+    }
+
+    @Test
+    void testExpressionLanguage() {
+        def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+        def json = prettyPrint(toJson([
+            first: [
+                second: [
+                    third: [
+                        "one", "two", "three", "four", "five"
+                    ]
+                ]
+            ]
+        ]))
+
+        testRunner.setValidateExpressionUsage(true);
+        testRunner.setProperty(FlattenJson.SEPARATOR, '${separator.char}')
+        baseTest(testRunner, json, ["separator.char": "_"], 1) { parsed ->
+            Assert.assertEquals("Separator not applied.", parsed["first_second_third"], [
+                "one", "two", "three", "four", "five"
+            ])
+        }
+    }
+}


Mime
View raw message