From commits-return-37013-archive-asf-public=cust-asf.ponee.io@nifi.apache.org Sat Jan 6 00:15:44 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 52B23180647 for ; Sat, 6 Jan 2018 00:15:44 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 42867160C28; Fri, 5 Jan 2018 23:15:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3F59B160C27 for ; Sat, 6 Jan 2018 00:15:43 +0100 (CET) Received: (qmail 97376 invoked by uid 500); 5 Jan 2018 23:15:40 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 97314 invoked by uid 99); 5 Jan 2018 23:15:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Jan 2018 23:15:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BB80ADFDE6; Fri, 5 Jan 2018 23:15:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mattyb149@apache.org To: commits@nifi.apache.org Message-Id: <324d33e2dd9241dd821f6cb6dee5cf4e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: nifi git commit: NIFI-4649 Added FlattenJson processor. Date: Fri, 5 Jan 2018 23:15:40 +0000 (UTC) Repository: nifi Updated Branches: refs/heads/master 89fb1b37d -> d9866c75e NIFI-4649 Added FlattenJson processor. Signed-off-by: Matthew Burgess This closes #2307 Replaced star imports, removed unused import Added explanation for invalid Expression Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d9866c75 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d9866c75 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d9866c75 Branch: refs/heads/master Commit: d9866c75e2475986ed799c9fb75c622fc38f5a4f Parents: 89fb1b3 Author: Mike Thomsen Authored: Thu Nov 30 07:23:53 2017 -0500 Committer: Matthew Burgess Committed: Fri Jan 5 18:14:30 2018 -0500 ---------------------------------------------------------------------- .../nifi-standard-processors/pom.xml | 5 + .../nifi/processors/standard/FlattenJson.java | 145 +++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../processors/standard/TestFlattenJson.groovy | 137 ++++++++++++++++++ 4 files changed, 288 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d9866c75/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index a96631a..2557d1a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -303,6 +303,11 @@ + com.github.wnameless + json-flattener + 0.4.1 + + org.apache.bval bval-jsr 1.1.2 http://git-wip-us.apache.org/repos/asf/nifi/blob/d9866c75/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java new file mode 100644 index 0000000..0406157 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.wnameless.json.flattener.FlattenMode; +import com.github.wnameless.json.flattener.JsonFlattener; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageCompiler; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({ "json", "flatten" }) +@CapabilityDescription( + "Provides the user with the ability to take a nested JSON document and flatten it into a simple key/value pair " + + "document. The keys are combined at each level with a user-defined separator that defaults to '.'" +) +@SideEffectFree +public class FlattenJson extends AbstractProcessor { + static final Relationship REL_SUCCESS = new Relationship.Builder() + .description("Successfully flattened files go to this relationship.") + .name("success") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .description("Files that cannot be flattened go to this relationship.") + .name("failure") + .build(); + + static final PropertyDescriptor SEPARATOR = new PropertyDescriptor.Builder() + .name("flatten-json-separator") + .displayName("Separator") + .defaultValue(".") + .description("The separator character used for joining keys. Must be a JSON-legal character.") + .addValidator((subject, input, context) -> { + if (context.isExpressionLanguagePresent(input)) { + ExpressionLanguageCompiler elc = context.newExpressionLanguageCompiler(); + final boolean validExpression = elc.isValidExpression(input); + return new ValidationResult.Builder().subject(subject).input(input) + .valid(validExpression).explanation(validExpression ? "": "Not a valid Expression").build(); + } + + boolean valid = input != null && input.length() == 1; + String message = !valid ? "The separator must be a single character in length." : ""; + + ObjectMapper mapper = new ObjectMapper(); + String test = String.format("{ \"prop%sprop\": \"test\" }", input); + try { + mapper.readValue(test, Map.class); + } catch (IOException e) { + message = e.getLocalizedMessage(); + valid = false; + } + + return new ValidationResult.Builder().subject(subject).input(input).valid(valid).explanation(message).build(); + }) + .expressionLanguageSupported(true) + .build(); + + private List properties; + private Set relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + List props = new ArrayList<>(); + props.add(SEPARATOR); + properties = Collections.unmodifiableList(props); + + Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + + relationships = Collections.unmodifiableSet(rels); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + String separator = context.getProperty(SEPARATOR).evaluateAttributeExpressions(flowFile).getValue(); + + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + session.exportTo(flowFile, bos); + bos.close(); + + String raw = new String(bos.toByteArray()); + final String flattened = new JsonFlattener(raw) + .withFlattenMode(FlattenMode.KEEP_ARRAYS) + .withSeparator(separator.charAt(0)) + .flatten(); + + flowFile = session.write(flowFile, os -> os.write(flattened.getBytes())); + + session.transfer(flowFile, REL_SUCCESS); + } catch (Exception ex) { + session.transfer(flowFile, REL_FAILURE); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d9866c75/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index c95f964..3fb0de3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -33,6 +33,7 @@ org.apache.nifi.processors.standard.ExecuteProcess org.apache.nifi.processors.standard.ExtractText org.apache.nifi.processors.standard.FetchSFTP org.apache.nifi.processors.standard.FetchFile +org.apache.nifi.processors.standard.FlattenJson org.apache.nifi.processors.standard.GenerateFlowFile org.apache.nifi.processors.standard.GetFile org.apache.nifi.processors.standard.GetFTP http://git-wip-us.apache.org/repos/asf/nifi/blob/d9866c75/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy new file mode 100644 index 0000000..b319258 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard + +import groovy.json.JsonSlurper +import org.apache.nifi.util.TestRunners +import org.junit.Assert +import org.junit.Test +import static groovy.json.JsonOutput.prettyPrint +import static groovy.json.JsonOutput.toJson + +class TestFlattenJson { + @Test + void testFlatten() { + def testRunner = TestRunners.newTestRunner(FlattenJson.class) + def json = prettyPrint(toJson([ + test: [ + msg: "Hello, world" + ], + first: [ + second: [ + third: [ + "one", "two", "three", "four", "five" + ] + ] + ] + ])) + baseTest(testRunner, json, 2) { parsed -> + Assert.assertEquals("test.msg should exist, but doesn't", parsed["test.msg"], "Hello, world") + Assert.assertEquals("Three level block doesn't exist.", parsed["first.second.third"], [ + "one", "two", "three", "four", "five" + ]) + } + } + + void baseTest(testRunner, String json, int keyCount, Closure c) { + baseTest(testRunner, json, [:], keyCount, c); + } + + void baseTest(def testRunner, String json, Map attrs, int keyCount, Closure c) { + testRunner.enqueue(json, attrs) + testRunner.run(1, true) + testRunner.assertTransferCount(FlattenJson.REL_FAILURE, 0) + testRunner.assertTransferCount(FlattenJson.REL_SUCCESS, 1) + + def flowFiles = testRunner.getFlowFilesForRelationship(FlattenJson.REL_SUCCESS) + def content = testRunner.getContentAsByteArray(flowFiles[0]) + def asJson = new String(content) + def slurper = new JsonSlurper() + def parsed = slurper.parseText(asJson) as Map + + Assert.assertEquals("Too many keys", keyCount, parsed.size()) + c.call(parsed) + } + + @Test + void testFlattenRecordSet() { + def testRunner = TestRunners.newTestRunner(FlattenJson.class) + def json = prettyPrint(toJson([ + [ + first: [ + second: "Hello" + ] + ], + [ + first: [ + second: "World" + ] + ] + ])) + + def expected = ["Hello", "World"] + baseTest(testRunner, json, 2) { parsed -> + Assert.assertTrue("Not a list", parsed instanceof List) + 0.upto(parsed.size() - 1) { + Assert.assertEquals("Missing values.", parsed[it]["first.second"], expected[it]) + } + } + } + + @Test + void testDifferentSeparator() { + def testRunner = TestRunners.newTestRunner(FlattenJson.class) + def json = prettyPrint(toJson([ + first: [ + second: [ + third: [ + "one", "two", "three", "four", "five" + ] + ] + ] + ])) + testRunner.setProperty(FlattenJson.SEPARATOR, "_") + baseTest(testRunner, json, 1) { parsed -> + Assert.assertEquals("Separator not applied.", parsed["first_second_third"], [ + "one", "two", "three", "four", "five" + ]) + } + } + + @Test + void testExpressionLanguage() { + def testRunner = TestRunners.newTestRunner(FlattenJson.class) + def json = prettyPrint(toJson([ + first: [ + second: [ + third: [ + "one", "two", "three", "four", "five" + ] + ] + ] + ])) + + testRunner.setValidateExpressionUsage(true); + testRunner.setProperty(FlattenJson.SEPARATOR, '${separator.char}') + baseTest(testRunner, json, ["separator.char": "_"], 1) { parsed -> + Assert.assertEquals("Separator not applied.", parsed["first_second_third"], [ + "one", "two", "three", "four", "five" + ]) + } + } +}