nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mcgil...@apache.org
Subject [31/50] incubator-nifi git commit: Removing JsonUtils as all functionality was migrated into AbstractJsonPathProcessor given its limited utility outside of those classes. Adjusting validation approach for JsonPath processors to accomodate caching of exp
Date Wed, 04 Mar 2015 14:30:24 GMT
Removing JsonUtils as all functionality was migrated into AbstractJsonPathProcessor given its
limited utility outside of those classes.  Adjusting validation approach for JsonPath processors
to accomodate caching of expressions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4d3cff35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4d3cff35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4d3cff35

Branch: refs/heads/NIFI-376
Commit: 4d3cff3592d16d1ce5608b20a025edf34a7c69d7
Parents: b1f9713
Author: Aldrin Piri <aldrinpiri@gmail.com>
Authored: Sun Mar 1 13:26:03 2015 -0500
Committer: Aldrin Piri <aldrinpiri@gmail.com>
Committed: Sun Mar 1 13:26:03 2015 -0500

----------------------------------------------------------------------
 .../standard/AbstractJsonPathProcessor.java     | 41 +++++++++++------
 .../processors/standard/EvaluateJsonPath.java   | 43 +++++++++++++++++-
 .../nifi/processors/standard/SplitJson.java     | 32 +++++++++++--
 .../processors/standard/util/JsonUtils.java     | 47 --------------------
 4 files changed, 98 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d3cff35/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
index 02547f3..baeef7b 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
@@ -51,19 +51,6 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor
{
 
     private static final JsonProvider JSON_PROVIDER = STRICT_PROVIDER_CONFIGURATION.jsonProvider();
 
-    public static final Validator JSON_PATH_VALIDATOR = new Validator() {
-        @Override
-        public ValidationResult validate(final String subject, final String input, final
ValidationContext context) {
-            String error = null;
-            try {
-                JsonPath compile = JsonPath.compile(input);
-            } catch (InvalidPathException ipe) {
-                error = ipe.toString();
-            }
-            return new ValidationResult.Builder().subject(subject).valid(error == null).explanation(error).build();
-        }
-    };
-
     static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession,
FlowFile flowFile) {
         // Parse the document once into an associated context to support multiple path evaluations
if specified
         final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
@@ -99,4 +86,32 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor
{
         return JSON_PROVIDER.toJson(jsonPathResult);
     }
 
+    protected abstract static class JsonPathValidator implements Validator {
+
+        @Override
+        public ValidationResult validate(final String subject, final String input, final
ValidationContext context) {
+            JsonPath compiledJsonPath = null;
+            String error = null;
+            try {
+                if (isStale(subject, input)) {
+                    compiledJsonPath = JsonPath.compile(input);
+                    cacheComputedValue(subject, input, compiledJsonPath);
+                }
+            } catch (InvalidPathException ipe) {
+                error = ipe.toString();
+            }
+            return new ValidationResult.Builder().subject(subject).valid(error == null).explanation(error).build();
+        }
+
+        /**
+         * An optional hook to act on the compute value
+         */
+        abstract void cacheComputedValue(String subject, String input, JsonPath computedJsonPath);
+
+        /**
+         * A hook for implementing classes to determine if a cached value is stale for a
compiled JsonPath represented
+         * by either a validation
+         */
+        abstract boolean isStale(String subject, String input);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d3cff35/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
index 1b89dee..b40f6c6 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
@@ -20,11 +20,13 @@ import com.jayway.jsonpath.DocumentContext;
 import com.jayway.jsonpath.InvalidJsonException;
 import com.jayway.jsonpath.JsonPath;
 import com.jayway.jsonpath.PathNotFoundException;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -38,12 +40,14 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.util.ObjectHolder;
-import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 @EventDriven
 @SideEffectFree
@@ -92,6 +96,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> properties;
 
+    private ConcurrentMap<String, Tuple<String, JsonPath>> cachedJsonPathMap
= new ConcurrentHashMap<>();
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -145,13 +150,47 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
         return new PropertyDescriptor.Builder()
                 .name(propertyDescriptorName)
                 .expressionLanguageSupported(false)
-                .addValidator(JSON_PATH_VALIDATOR)
+                .addValidator(new JsonPathValidator() {
+                    @Override
+                    public void cacheComputedValue(String subject, String input, JsonPath
computedJsonPath) {
+                        cachedJsonPathMap.put(subject, new Tuple<>(input, computedJsonPath));
+
+                    }
+
+                    @Override
+                    public boolean isStale(String subject, String input) {
+                        return cachedJsonPathMap.get(subject) == null;
+                    }
+                })
                 .required(false)
                 .dynamic(true)
                 .build();
     }
 
     @Override
+    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String
newValue) {
+        if (descriptor.isDynamic()) {
+            if (!StringUtils.equals(oldValue, newValue)) {
+                cachedJsonPathMap.remove(descriptor.getName());
+            }
+        }
+    }
+
+    /**
+     * Provides cleanup of the map for any JsonPath values that may have been created.  This
will remove common values
+     * shared between multiple instances, but will be regenerated when the next validation
cycle occurs as a result of
+     * isStale()
+     */
+    @OnRemoved
+    public void onRemoved() {
+        for (PropertyDescriptor propertyDescriptor : getPropertyDescriptors()) {
+            if (propertyDescriptor.isDynamic()) {
+                cachedJsonPathMap.remove(propertyDescriptor.getName());
+            }
+        }
+    }
+
+    @Override
     public void onTrigger(ProcessContext processContext, final ProcessSession processSession)
throws ProcessException {
 
         FlowFile flowFile = processSession.get();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d3cff35/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
index 5177bdd..8c7ae4d 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
@@ -20,6 +20,7 @@ import com.jayway.jsonpath.DocumentContext;
 import com.jayway.jsonpath.InvalidJsonException;
 import com.jayway.jsonpath.JsonPath;
 import com.jayway.jsonpath.PathNotFoundException;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -38,6 +39,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 @EventDriven
 @SideEffectFree
@@ -53,7 +56,17 @@ public class SplitJson extends AbstractJsonPathProcessor {
             .name("JsonPath Expression")
             .description("A JsonPath expression that indicates the array element to split
into JSON/scalar fragments.")
             .required(true)
-            .addValidator(JSON_PATH_VALIDATOR)
+            .addValidator(new JsonPathValidator() {
+                @Override
+                public void cacheComputedValue(String subject, String input, JsonPath computedJson)
{
+                    JSON_PATH_MAP.put(input, computedJson);
+                }
+
+                @Override
+                public boolean isStale(String subject, String input) {
+                    return JSON_PATH_MAP.get(input) == null;
+                }
+            })
             .build();
 
     public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The
original FlowFile that was split into segments. If the FlowFile fails processing, nothing
will be sent to this relationship").build();
@@ -63,6 +76,8 @@ public class SplitJson extends AbstractJsonPathProcessor {
     private List<PropertyDescriptor> properties;
     private Set<Relationship> relationships;
 
+    private static final ConcurrentMap<String, JsonPath> JSON_PATH_MAP = new ConcurrentHashMap();
+
     @Override
     protected void init(final ProcessorInitializationContext context) {
         final List<PropertyDescriptor> properties = new ArrayList<>();
@@ -87,6 +102,16 @@ public class SplitJson extends AbstractJsonPathProcessor {
     }
 
     @Override
+    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String
newValue) {
+        if (descriptor.equals(ARRAY_JSON_PATH_EXPRESSION)) {
+            if (!StringUtils.equals(oldValue, newValue)) {
+                // clear the cached item
+                JSON_PATH_MAP.remove(oldValue);
+            }
+        }
+    }
+
+    @Override
     public void onTrigger(final ProcessContext processContext, final ProcessSession processSession)
{
         final FlowFile original = processSession.get();
         if (original == null) {
@@ -104,8 +129,9 @@ public class SplitJson extends AbstractJsonPathProcessor {
             return;
         }
 
-        final String jsonPathExpression = processContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue();
-        final JsonPath jsonPath = JsonPath.compile(jsonPathExpression);
+        String jsonPathExpression = processContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue();
+        final JsonPath jsonPath = JSON_PATH_MAP.get(jsonPathExpression);
+        getLogger().info("Using value {} for split ", new Object[]{jsonPathExpression});
 
         final List<FlowFile> segments = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d3cff35/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java
deleted file mode 100644
index 68b18b8..0000000
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util;
-
-import net.minidev.json.JSONValue;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-/**
- * Provides utilities for interacting with JSON elements
- *
- * @see <a href="http://json.org">http://json.org</a>
- */
-public class JsonUtils {
-
-    /**
-     * JSONValue#isValidJson is permissive to the degree of the Smart JSON definition, accordingly
a strict JSON approach
-     * is preferred in determining whether or not a document is valid.
-     * Performs a validation of the provided stream according to RFC 4627 as implemented
by {@link net.minidev.json.parser.JSONParser#MODE_RFC4627}
-     *
-     * @param inputStream of content to be validated as JSON
-     * @return true, if the content is valid within the bounds of the strictness specified;
false otherwise
-     * @throws IOException
-     */
-    public static boolean isValidJson(InputStream inputStream) throws IOException {
-        try (InputStreamReader inputStreamReader = new InputStreamReader(inputStream)) {
-            return JSONValue.isValidJsonStrict(inputStreamReader);
-        }
-    }
-
-}


Mime
View raw message