From commits-return-5166-apmail-nifi-commits-archive=nifi.apache.org@nifi.incubator.apache.org Wed Mar 4 14:30:53 2015 Return-Path: X-Original-To: apmail-nifi-commits-archive@minotaur.apache.org Delivered-To: apmail-nifi-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BD4F41741B for ; Wed, 4 Mar 2015 14:30:53 +0000 (UTC) Received: (qmail 73680 invoked by uid 500); 4 Mar 2015 14:30:19 -0000 Delivered-To: apmail-nifi-commits-archive@nifi.apache.org Received: (qmail 73593 invoked by uid 500); 4 Mar 2015 14:30:19 -0000 Mailing-List: contact commits-help@nifi.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.incubator.apache.org Delivered-To: mailing list commits@nifi.incubator.apache.org Received: (qmail 73568 invoked by uid 99); 4 Mar 2015 14:30:19 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Mar 2015 14:30:19 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 04 Mar 2015 14:30:14 +0000 Received: (qmail 71191 invoked by uid 99); 4 Mar 2015 14:29:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Mar 2015 14:29:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7FD01E1082; Wed, 4 Mar 2015 14:29:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mcgilman@apache.org To: commits@nifi.incubator.apache.org Date: Wed, 04 Mar 2015 14:30:24 -0000 Message-Id: <28527e3cfaff41b4aec9bb64e4ac6d56@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [31/50] incubator-nifi git commit: Removing JsonUtils as all functionality was migrated into AbstractJsonPathProcessor given its limited utility outside of those classes. Adjusting validation approach for JsonPath processors to accomodate caching of exp X-Virus-Checked: Checked by ClamAV on apache.org Removing JsonUtils as all functionality was migrated into AbstractJsonPathProcessor given its limited utility outside of those classes. Adjusting validation approach for JsonPath processors to accomodate caching of expressions. Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4d3cff35 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4d3cff35 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4d3cff35 Branch: refs/heads/NIFI-376 Commit: 4d3cff3592d16d1ce5608b20a025edf34a7c69d7 Parents: b1f9713 Author: Aldrin Piri Authored: Sun Mar 1 13:26:03 2015 -0500 Committer: Aldrin Piri Committed: Sun Mar 1 13:26:03 2015 -0500 ---------------------------------------------------------------------- .../standard/AbstractJsonPathProcessor.java | 41 +++++++++++------ .../processors/standard/EvaluateJsonPath.java | 43 +++++++++++++++++- .../nifi/processors/standard/SplitJson.java | 32 +++++++++++-- .../processors/standard/util/JsonUtils.java | 47 -------------------- 4 files changed, 98 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d3cff35/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java index 02547f3..baeef7b 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java @@ -51,19 +51,6 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor { private static final JsonProvider JSON_PROVIDER = STRICT_PROVIDER_CONFIGURATION.jsonProvider(); - public static final Validator JSON_PATH_VALIDATOR = new Validator() { - @Override - public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - String error = null; - try { - JsonPath compile = JsonPath.compile(input); - } catch (InvalidPathException ipe) { - error = ipe.toString(); - } - return new ValidationResult.Builder().subject(subject).valid(error == null).explanation(error).build(); - } - }; - static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) { // Parse the document once into an associated context to support multiple path evaluations if specified final ObjectHolder contextHolder = new ObjectHolder<>(null); @@ -99,4 +86,32 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor { return JSON_PROVIDER.toJson(jsonPathResult); } + protected abstract static class JsonPathValidator implements Validator { + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + JsonPath compiledJsonPath = null; + String error = null; + try { + if (isStale(subject, input)) { + compiledJsonPath = JsonPath.compile(input); + cacheComputedValue(subject, input, compiledJsonPath); + } + } catch (InvalidPathException ipe) { + error = ipe.toString(); + } + return new ValidationResult.Builder().subject(subject).valid(error == null).explanation(error).build(); + } + + /** + * An optional hook to act on the compute value + */ + abstract void cacheComputedValue(String subject, String input, JsonPath computedJsonPath); + + /** + * A hook for implementing classes to determine if a cached value is stale for a compiled JsonPath represented + * by either a validation + */ + abstract boolean isStale(String subject, String input); + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d3cff35/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java index 1b89dee..b40f6c6 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java @@ -20,11 +20,13 @@ import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.InvalidJsonException; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.PathNotFoundException; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -38,12 +40,14 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.util.ObjectHolder; -import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.Tuple; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; @EventDriven @SideEffectFree @@ -92,6 +96,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { private Set relationships; private List properties; + private ConcurrentMap> cachedJsonPathMap = new ConcurrentHashMap<>(); @Override protected void init(final ProcessorInitializationContext context) { @@ -145,13 +150,47 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { return new PropertyDescriptor.Builder() .name(propertyDescriptorName) .expressionLanguageSupported(false) - .addValidator(JSON_PATH_VALIDATOR) + .addValidator(new JsonPathValidator() { + @Override + public void cacheComputedValue(String subject, String input, JsonPath computedJsonPath) { + cachedJsonPathMap.put(subject, new Tuple<>(input, computedJsonPath)); + + } + + @Override + public boolean isStale(String subject, String input) { + return cachedJsonPathMap.get(subject) == null; + } + }) .required(false) .dynamic(true) .build(); } @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + if (descriptor.isDynamic()) { + if (!StringUtils.equals(oldValue, newValue)) { + cachedJsonPathMap.remove(descriptor.getName()); + } + } + } + + /** + * Provides cleanup of the map for any JsonPath values that may have been created. This will remove common values + * shared between multiple instances, but will be regenerated when the next validation cycle occurs as a result of + * isStale() + */ + @OnRemoved + public void onRemoved() { + for (PropertyDescriptor propertyDescriptor : getPropertyDescriptors()) { + if (propertyDescriptor.isDynamic()) { + cachedJsonPathMap.remove(propertyDescriptor.getName()); + } + } + } + + @Override public void onTrigger(ProcessContext processContext, final ProcessSession processSession) throws ProcessException { FlowFile flowFile = processSession.get(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d3cff35/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java index 5177bdd..8c7ae4d 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java @@ -20,6 +20,7 @@ import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.InvalidJsonException; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.PathNotFoundException; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -38,6 +39,8 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; @EventDriven @SideEffectFree @@ -53,7 +56,17 @@ public class SplitJson extends AbstractJsonPathProcessor { .name("JsonPath Expression") .description("A JsonPath expression that indicates the array element to split into JSON/scalar fragments.") .required(true) - .addValidator(JSON_PATH_VALIDATOR) + .addValidator(new JsonPathValidator() { + @Override + public void cacheComputedValue(String subject, String input, JsonPath computedJson) { + JSON_PATH_MAP.put(input, computedJson); + } + + @Override + public boolean isStale(String subject, String input) { + return JSON_PATH_MAP.get(input) == null; + } + }) .build(); public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship").build(); @@ -63,6 +76,8 @@ public class SplitJson extends AbstractJsonPathProcessor { private List properties; private Set relationships; + private static final ConcurrentMap JSON_PATH_MAP = new ConcurrentHashMap(); + @Override protected void init(final ProcessorInitializationContext context) { final List properties = new ArrayList<>(); @@ -87,6 +102,16 @@ public class SplitJson extends AbstractJsonPathProcessor { } @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + if (descriptor.equals(ARRAY_JSON_PATH_EXPRESSION)) { + if (!StringUtils.equals(oldValue, newValue)) { + // clear the cached item + JSON_PATH_MAP.remove(oldValue); + } + } + } + + @Override public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) { final FlowFile original = processSession.get(); if (original == null) { @@ -104,8 +129,9 @@ public class SplitJson extends AbstractJsonPathProcessor { return; } - final String jsonPathExpression = processContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue(); - final JsonPath jsonPath = JsonPath.compile(jsonPathExpression); + String jsonPathExpression = processContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue(); + final JsonPath jsonPath = JSON_PATH_MAP.get(jsonPathExpression); + getLogger().info("Using value {} for split ", new Object[]{jsonPathExpression}); final List segments = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d3cff35/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java deleted file mode 100644 index 68b18b8..0000000 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard.util; - -import net.minidev.json.JSONValue; - -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; - -/** - * Provides utilities for interacting with JSON elements - * - * @see http://json.org - */ -public class JsonUtils { - - /** - * JSONValue#isValidJson is permissive to the degree of the Smart JSON definition, accordingly a strict JSON approach - * is preferred in determining whether or not a document is valid. - * Performs a validation of the provided stream according to RFC 4627 as implemented by {@link net.minidev.json.parser.JSONParser#MODE_RFC4627} - * - * @param inputStream of content to be validated as JSON - * @return true, if the content is valid within the bounds of the strictness specified; false otherwise - * @throws IOException - */ - public static boolean isValidJson(InputStream inputStream) throws IOException { - try (InputStreamReader inputStreamReader = new InputStreamReader(inputStream)) { - return JSONValue.isValidJsonStrict(inputStreamReader); - } - } - -}