nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mcgil...@apache.org
Subject [29/50] incubator-nifi git commit: Removing the separate reads for validation preferring to do the read once and handle any exceptions.
Date Fri, 06 Mar 2015 04:21:43 GMT
Removing the separate reads for validation preferring to do the read once and handle any exceptions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/162f02b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/162f02b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/162f02b1

Branch: refs/heads/NIFI-353
Commit: 162f02b12fd7b6d1a78efab09942ceffea49e07d
Parents: 57aa5dd
Author: Aldrin Piri <aldrinpiri@gmail.com>
Authored: Sat Feb 28 21:24:25 2015 -0500
Committer: Aldrin Piri <aldrinpiri@gmail.com>
Committed: Sat Feb 28 21:24:25 2015 -0500

----------------------------------------------------------------------
 .../standard/AbstractJsonPathProcessor.java     | 36 ++++++++------------
 .../processors/standard/EvaluateJsonPath.java   | 15 ++++----
 .../nifi/processors/standard/SplitJson.java     |  9 ++---
 3 files changed, 26 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/162f02b1/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
index 70efe38..02547f3 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
@@ -20,7 +20,9 @@ import com.jayway.jsonpath.Configuration;
 import com.jayway.jsonpath.DocumentContext;
 import com.jayway.jsonpath.InvalidPathException;
 import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.internal.spi.json.JsonSmartJsonProvider;
 import com.jayway.jsonpath.spi.json.JsonProvider;
+import net.minidev.json.parser.JSONParser;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
@@ -28,9 +30,7 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processors.standard.util.JsonUtils;
 import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.util.BooleanHolder;
 import org.apache.nifi.util.ObjectHolder;
 
 import java.io.IOException;
@@ -46,7 +46,10 @@ import java.util.Map;
  */
 public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
 
-    protected static final JsonProvider JSON_PROVIDER = Configuration.defaultConfiguration().jsonProvider();
+    private static final Configuration STRICT_PROVIDER_CONFIGURATION =
+            Configuration.builder().jsonProvider(new JsonSmartJsonProvider(JSONParser.MODE_RFC4627)).build();
+
+    private static final JsonProvider JSON_PROVIDER = STRICT_PROVIDER_CONFIGURATION.jsonProvider();
 
     public static final Validator JSON_PATH_VALIDATOR = new Validator() {
         @Override
@@ -57,35 +60,23 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor
{
             } catch (InvalidPathException ipe) {
                 error = ipe.toString();
             }
-            return new ValidationResult.Builder().subject("JsonPath expression " + subject).valid(error
== null).explanation(error).build();
+            return new ValidationResult.Builder().subject(subject).valid(error == null).explanation(error).build();
         }
     };
 
     static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession,
FlowFile flowFile) {
-
-        final BooleanHolder validJsonHolder = new BooleanHolder(false);
+        // Parse the document once into an associated context to support multiple path evaluations
if specified
+        final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
         processSession.read(flowFile, new InputStreamCallback() {
             @Override
             public void process(InputStream in) throws IOException {
-                validJsonHolder.set(JsonUtils.isValidJson(in));
+                try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in))
{
+                    DocumentContext ctx = JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(bufferedInputStream);
+                    contextHolder.set(ctx);
+                }
             }
         });
 
-        // Parse the document once into an associated context to support multiple path evaluations
if specified
-        final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
-
-        if (validJsonHolder.get()) {
-            processSession.read(flowFile, new InputStreamCallback() {
-                @Override
-                public void process(InputStream in) throws IOException {
-                    try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in))
{
-                        DocumentContext ctx = JsonPath.parse(in);
-                        contextHolder.set(ctx);
-                    }
-                }
-            });
-        }
-
         return contextHolder.get();
     }
 
@@ -107,4 +98,5 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor
{
         }
         return JSON_PROVIDER.toJson(jsonPathResult);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/162f02b1/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
index ed0f07c..1b89dee 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard;
 
 import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.InvalidJsonException;
 import com.jayway.jsonpath.JsonPath;
 import com.jayway.jsonpath.PathNotFoundException;
 import org.apache.nifi.annotation.behavior.EventDriven;
@@ -177,9 +178,10 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
             returnType = destination.equals(DESTINATION_CONTENT) ? RETURN_TYPE_JSON : RETURN_TYPE_SCALAR;
         }
 
-        final DocumentContext documentContext = validateAndEstablishJsonContext(processSession,
flowFile);
-
-        if (documentContext == null) {
+        DocumentContext documentContext = null;
+        try {
+            documentContext = validateAndEstablishJsonContext(processSession, flowFile);
+        } catch (InvalidJsonException e) {
             logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile});
             processSession.transfer(flowFile, REL_FAILURE);
             return;
@@ -187,7 +189,6 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
 
         final Map<String, String> jsonPathResults = new HashMap<>();
 
-        jsonPathEvalLoop:
         for (final Map.Entry<String, JsonPath> attributeJsonPathEntry : attributeToJsonPathMap.entrySet())
{
 
             String jsonPathAttrKey = attributeJsonPathEntry.getKey();
@@ -207,7 +208,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
                 logger.warn("FlowFile {} could not find path {} for attribute key {}.", new
Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
                 if (destination.equals(DESTINATION_ATTRIBUTE)) {
                     jsonPathResults.put(jsonPathAttrKey, StringUtils.EMPTY);
-                    continue jsonPathEvalLoop;
+                    continue;
                 } else {
                     processSession.transfer(flowFile, REL_NO_MATCH);
                     return;
@@ -235,6 +236,4 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
         flowFile = processSession.putAllAttributes(flowFile, jsonPathResults);
         processSession.transfer(flowFile, REL_MATCH);
     }
-
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/162f02b1/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
index fe2216a..5177bdd 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard;
 
 import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.InvalidJsonException;
 import com.jayway.jsonpath.JsonPath;
 import com.jayway.jsonpath.PathNotFoundException;
 import org.apache.nifi.annotation.behavior.EventDriven;
@@ -94,10 +95,10 @@ public class SplitJson extends AbstractJsonPathProcessor {
 
         final ProcessorLog logger = getLogger();
 
-
-        final DocumentContext documentContext = validateAndEstablishJsonContext(processSession,
original);
-
-        if (documentContext == null) {
+        DocumentContext documentContext = null;
+        try {
+            documentContext = validateAndEstablishJsonContext(processSession, original);
+        } catch (InvalidJsonException e) {
             logger.error("FlowFile {} did not have valid JSON content.", new Object[]{original});
             processSession.transfer(original, REL_FAILURE);
             return;


Mime
View raw message