Adding an abstract class to serve as a base class for JsonPath related processors and preferring
this for much of the functionality present in JsonUtils.
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/46bf048b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/46bf048b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/46bf048b
Branch: refs/heads/NIFI-353
Commit: 46bf048b2407ddbd61024d2d0e69beccb2f72014
Parents: 81234f3
Author: Aldrin Piri <aldrinpiri@gmail.com>
Authored: Sat Feb 21 20:26:58 2015 -0500
Committer: Aldrin Piri <aldrinpiri@gmail.com>
Committed: Sat Feb 21 20:26:58 2015 -0500
----------------------------------------------------------------------
.../standard/AbstractJsonPathProcessor.java | 110 +++++++++++++++++++
.../processors/standard/EvaluateJsonPath.java | 17 +--
.../nifi/processors/standard/SplitJson.java | 14 ++-
.../processors/standard/util/JsonUtils.java | 82 +-------------
4 files changed, 129 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/46bf048b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
new file mode 100644
index 0000000..cb4dcf6
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.InvalidPathException;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.spi.json.JsonProvider;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.standard.util.JsonUtils;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.util.BooleanHolder;
+import org.apache.nifi.util.ObjectHolder;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides common functionality used for processors interacting and manipulating JSON data
via JsonPath.
+ *
+ * @see <a href="http://json.org">http://json.org</a>
+ * @see <a href="https://github.com/jayway/JsonPath">https://github.com/jayway/JsonPath</a>
+ */
+public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
+
+ protected static final JsonProvider JSON_PROVIDER = Configuration.defaultConfiguration().jsonProvider();
+
+ public static final Validator JSON_PATH_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String input, final
ValidationContext context) {
+ String error = null;
+ try {
+ JsonPath compile = JsonPath.compile(input);
+ } catch (InvalidPathException ipe) {
+ error = ipe.toString();
+ }
+ return new ValidationResult.Builder().valid(error == null).explanation(error).build();
+ }
+ };
+
+ public static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession,
FlowFile flowFile) {
+
+ final BooleanHolder validJsonHolder = new BooleanHolder(false);
+ processSession.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(InputStream in) throws IOException {
+ validJsonHolder.set(JsonUtils.isValidJson(in));
+ }
+ });
+
+ // Parse the document once into an associated context to support multiple path evaluations
if specified
+ final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
+
+ if (validJsonHolder.get()) {
+ processSession.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(InputStream in) throws IOException {
+ try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in))
{
+ DocumentContext ctx = JsonPath.parse(in);
+ contextHolder.set(ctx);
+ }
+ }
+ });
+ }
+
+ return contextHolder.get();
+ }
+
+ /**
+ * Determines the context by which JsonSmartJsonProvider would treat the value. {@link
java.util.Map} and
+ * {@link java.util.List} objects can be rendered as JSON elements, everything else is
treated as a scalar.
+ *
+ * @param obj item to be inspected if it is a scalar or a JSON element
+ * @return false, if the object is a supported type; true otherwise
+ */
+ static boolean isJsonScalar(Object obj) {
+ // For the default provider, JsonSmartJsonProvider, a Map or List is able to be handled
as a JSON entity
+ return !(obj instanceof Map || obj instanceof List);
+ }
+
+ public static String getResultRepresentation(Object jsonPathResult) {
+ if (isJsonScalar(jsonPathResult)) {
+ return jsonPathResult.toString();
+ }
+ return JSON_PROVIDER.toJson(jsonPathResult);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/46bf048b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
index 65266ff..f7caa68 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
@@ -29,10 +29,12 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
-import org.apache.nifi.processor.*;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processors.standard.util.JsonUtils;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StringUtils;
@@ -57,7 +59,7 @@ import java.util.*;
+ "If Destination is 'flowfile-content' and the JsonPath does not evaluate to a defined
path, the FlowFile will be routed to 'unmatched' without having its contents modified. "
+ "If Destination is flowfile-attribute and the expression matches nothing, attributes
will be created with "
+ "empty strings as the value, and the FlowFile will always be routed to 'matched.'")
-public class EvaluateJsonPath extends AbstractProcessor {
+public class EvaluateJsonPath extends AbstractJsonPathProcessor {
public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
public static final String DESTINATION_CONTENT = "flowfile-content";
@@ -142,7 +144,7 @@ public class EvaluateJsonPath extends AbstractProcessor {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.expressionLanguageSupported(false)
- .addValidator(JsonUtils.JSON_PATH_VALIDATOR)
+ .addValidator(JSON_PATH_VALIDATOR)
.required(false)
.dynamic(true)
.build();
@@ -175,7 +177,7 @@ public class EvaluateJsonPath extends AbstractProcessor {
returnType = destination.equals(DESTINATION_CONTENT) ? RETURN_TYPE_JSON : RETURN_TYPE_SCALAR;
}
- final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession,
flowFile);
+ final DocumentContext documentContext = validateAndEstablishJsonContext(processSession,
flowFile);
if (documentContext == null) {
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile});
@@ -194,7 +196,7 @@ public class EvaluateJsonPath extends AbstractProcessor {
final ObjectHolder<Object> resultHolder = new ObjectHolder<>(null);
try {
Object result = documentContext.read(jsonPathExp);
- if (returnType.equals(RETURN_TYPE_SCALAR) && !JsonUtils.isJsonScalar(result))
{
+ if (returnType.equals(RETURN_TYPE_SCALAR) && !isJsonScalar(result))
{
logger.error("Unable to return a scalar value for the expression {} for
FlowFile {}. Evaluated value was {}. Transferring to {}.",
new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(),
REL_FAILURE.getName()});
processSession.transfer(flowFile, REL_FAILURE);
@@ -212,7 +214,7 @@ public class EvaluateJsonPath extends AbstractProcessor {
}
}
- final String resultRepresentation = JsonUtils.getResultRepresentation(resultHolder.get());
+ final String resultRepresentation = getResultRepresentation(resultHolder.get());
switch (destination) {
case DESTINATION_ATTRIBUTE:
jsonPathResults.put(jsonPathAttrKey, resultRepresentation);
@@ -232,4 +234,5 @@ public class EvaluateJsonPath extends AbstractProcessor {
processSession.transfer(flowFile, REL_MATCH);
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/46bf048b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
index 78e1b2a..8df2de0 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
@@ -27,9 +27,11 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
-import org.apache.nifi.processor.*;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processors.standard.util.JsonUtils;
import java.io.IOException;
import java.io.OutputStream;
@@ -44,13 +46,13 @@ import java.util.*;
+ "Each generated FlowFile is comprised of an element of the specified array and
transferred to relationship 'split,' "
+ "with the original file transferred to the 'original' relationship. If the specified
JsonPath is not found or "
+ "does not evaluate to an array element, the original file is routed to 'failure'
and no files are generated.")
-public class SplitJson extends AbstractProcessor {
+public class SplitJson extends AbstractJsonPathProcessor {
public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder()
.name("JsonPath Expression")
.description("A JsonPath expression that indicates the array element to split
into JSON/scalar fragments.")
.required(true)
- .addValidator(JsonUtils.JSON_PATH_VALIDATOR)
+ .addValidator(JSON_PATH_VALIDATOR)
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The
original FlowFile that was split into segments. If the FlowFile fails processing, nothing
will be sent to this relationship").build();
@@ -93,7 +95,7 @@ public class SplitJson extends AbstractProcessor {
final ProcessorLog logger = getLogger();
- final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession,
original);
+ final DocumentContext documentContext = validateAndEstablishJsonContext(processSession,
original);
if (documentContext == null) {
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{original});
@@ -129,7 +131,7 @@ public class SplitJson extends AbstractProcessor {
split = processSession.write(split, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
- String resultSegmentContent = JsonUtils.getResultRepresentation(resultSegment);
+ String resultSegmentContent = getResultRepresentation(resultSegment);
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
}
});
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/46bf048b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java
index 2174c1e..68b18b8 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java
@@ -16,79 +16,19 @@
*/
package org.apache.nifi.processors.standard.util;
-import com.jayway.jsonpath.Configuration;
-import com.jayway.jsonpath.DocumentContext;
-import com.jayway.jsonpath.InvalidPathException;
-import com.jayway.jsonpath.JsonPath;
-import com.jayway.jsonpath.spi.json.JsonProvider;
import net.minidev.json.JSONValue;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.util.BooleanHolder;
-import org.apache.nifi.util.ObjectHolder;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.util.List;
-import java.util.Map;
/**
- * Provides utilities for interacting with JSON elements and JsonPath expressions and results
+ * Provides utilities for interacting with JSON elements
*
* @see <a href="http://json.org">http://json.org</a>
- * @see <a href="https://github.com/jayway/JsonPath">https://github.com/jayway/JsonPath</a>
*/
public class JsonUtils {
- static final JsonProvider JSON_PROVIDER = Configuration.defaultConfiguration().jsonProvider();
-
- public static final Validator JSON_PATH_VALIDATOR = new Validator() {
- @Override
- public ValidationResult validate(final String subject, final String input, final
ValidationContext context) {
- String error = null;
- try {
- JsonPath compile = JsonPath.compile(input);
- } catch (InvalidPathException ipe) {
- error = ipe.toString();
- }
- return new ValidationResult.Builder().valid(error == null).explanation(error).build();
- }
- };
-
- public static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession,
FlowFile flowFile) {
-
- final BooleanHolder validJsonHolder = new BooleanHolder(false);
- processSession.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(InputStream in) throws IOException {
- validJsonHolder.set(JsonUtils.isValidJson(in));
- }
- });
-
- // Parse the document once into an associated context to support multiple path evaluations
if specified
- final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
-
- if (validJsonHolder.get()) {
- processSession.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(InputStream in) throws IOException {
- try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in))
{
- DocumentContext ctx = JsonPath.parse(in);
- contextHolder.set(ctx);
- }
- }
- });
- }
-
- return contextHolder.get();
- }
-
/**
* JSONValue#isValidJson is permissive to the degree of the Smart JSON definition, accordingly
a strict JSON approach
* is preferred in determining whether or not a document is valid.
@@ -104,24 +44,4 @@ public class JsonUtils {
}
}
- /**
- * Determines the context by which JsonSmartJsonProvider would treat the value. {@link
java.util.Map} and
- * {@link java.util.List} objects can be rendered as JSON elements, everything else is
treated as a scalar.
- *
- * @param obj item to be inspected if it is a scalar or a JSON element
- * @return false, if the object is a supported type; true otherwise
- */
- public static boolean isJsonScalar(Object obj) {
- // For the default provider, JsonSmartJsonProvider, a Map or List is able to be handled
as a JSON entity
- return !(obj instanceof Map || obj instanceof List);
- }
-
-
- public static String getResultRepresentation(Object jsonPathResult) {
- if (JsonUtils.isJsonScalar(jsonPathResult)) {
- return jsonPathResult.toString();
- }
- return JSON_PROVIDER.toJson(jsonPathResult);
- }
-
}
|