nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [14/50] [abbrv] incubator-nifi git commit: Removing the batched get of flowfiles to utilize the framework provided batching support
Date Mon, 02 Mar 2015 04:03:53 GMT
Removing the batched get of flowfiles to utilize the framework provided batching support


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/81234f3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/81234f3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/81234f3a

Branch: refs/heads/develop
Commit: 81234f3a6dd1effd4c66bcbdbe8d119843b7fdf2
Parents: 2862771
Author: Aldrin Piri <aldrinpiri@gmail.com>
Authored: Fri Feb 20 16:05:16 2015 -0500
Committer: Aldrin Piri <aldrinpiri@gmail.com>
Committed: Fri Feb 20 16:05:16 2015 -0500

----------------------------------------------------------------------
 .../processors/standard/EvaluateJsonPath.java   | 103 +++++++++----------
 1 file changed, 49 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/81234f3a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
index d79a6de..65266ff 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
@@ -151,8 +151,8 @@ public class EvaluateJsonPath extends AbstractProcessor {
     @Override
     public void onTrigger(ProcessContext processContext, final ProcessSession processSession)
throws ProcessException {
 
-        List<FlowFile> flowFiles = processSession.get(50);
-        if (flowFiles.isEmpty()) {
+        FlowFile flowFile = processSession.get();
+        if (flowFile == null) {
             return;
         }
 
@@ -175,66 +175,61 @@ public class EvaluateJsonPath extends AbstractProcessor {
             returnType = destination.equals(DESTINATION_CONTENT) ? RETURN_TYPE_JSON : RETURN_TYPE_SCALAR;
         }
 
-        flowFileLoop:
-        for (FlowFile flowFile : flowFiles) {
+        final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession,
flowFile);
 
-            final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession,
flowFile);
+        if (documentContext == null) {
+            logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile});
+            processSession.transfer(flowFile, REL_FAILURE);
+            return;
+        }
 
-            if (documentContext == null) {
-                logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile});
-                processSession.transfer(flowFile, REL_FAILURE);
-                continue flowFileLoop;
-            }
+        final Map<String, String> jsonPathResults = new HashMap<>();
+
+        jsonPathEvalLoop:
+        for (final Map.Entry<String, JsonPath> attributeJsonPathEntry : attributeToJsonPathMap.entrySet())
{
 
-            final Map<String, String> jsonPathResults = new HashMap<>();
-
-            jsonPathEvalLoop:
-            for (final Map.Entry<String, JsonPath> attributeJsonPathEntry : attributeToJsonPathMap.entrySet())
{
-
-                String jsonPathAttrKey = attributeJsonPathEntry.getKey();
-                JsonPath jsonPathExp = attributeJsonPathEntry.getValue();
-
-                final ObjectHolder<Object> resultHolder = new ObjectHolder<>(null);
-                try {
-                    Object result = documentContext.read(jsonPathExp);
-                    if (returnType.equals(RETURN_TYPE_SCALAR) && !JsonUtils.isJsonScalar(result))
{
-                        logger.error("Unable to return a scalar value for the expression
{} for FlowFile {}. Evaluated value was {}. Transferring to {}.",
-                                new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(),
REL_FAILURE.getName()});
-                        processSession.transfer(flowFile, REL_FAILURE);
-                        continue flowFileLoop;
-                    }
-                    resultHolder.set(result);
-                } catch (PathNotFoundException e) {
-                    logger.warn("FlowFile {} could not find path {} for attribute key {}.",
new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
-                    if (destination.equals(DESTINATION_ATTRIBUTE)) {
-                        jsonPathResults.put(jsonPathAttrKey, StringUtils.EMPTY);
-                        continue jsonPathEvalLoop;
-                    } else {
-                        processSession.transfer(flowFile, REL_NO_MATCH);
-                        continue flowFileLoop;
-                    }
+            String jsonPathAttrKey = attributeJsonPathEntry.getKey();
+            JsonPath jsonPathExp = attributeJsonPathEntry.getValue();
+
+            final ObjectHolder<Object> resultHolder = new ObjectHolder<>(null);
+            try {
+                Object result = documentContext.read(jsonPathExp);
+                if (returnType.equals(RETURN_TYPE_SCALAR) && !JsonUtils.isJsonScalar(result))
{
+                    logger.error("Unable to return a scalar value for the expression {} for
FlowFile {}. Evaluated value was {}. Transferring to {}.",
+                            new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(),
REL_FAILURE.getName()});
+                    processSession.transfer(flowFile, REL_FAILURE);
+                    return;
+                }
+                resultHolder.set(result);
+            } catch (PathNotFoundException e) {
+                logger.warn("FlowFile {} could not find path {} for attribute key {}.", new
Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
+                if (destination.equals(DESTINATION_ATTRIBUTE)) {
+                    jsonPathResults.put(jsonPathAttrKey, StringUtils.EMPTY);
+                    continue jsonPathEvalLoop;
+                } else {
+                    processSession.transfer(flowFile, REL_NO_MATCH);
+                    return;
                 }
+            }
 
-                final String resultRepresentation = JsonUtils.getResultRepresentation(resultHolder.get());
-                switch (destination) {
-                    case DESTINATION_ATTRIBUTE:
-                        jsonPathResults.put(jsonPathAttrKey, resultRepresentation);
-                        break;
-                    case DESTINATION_CONTENT:
-                        flowFile = processSession.write(flowFile, new OutputStreamCallback()
{
-                            @Override
-                            public void process(final OutputStream out) throws IOException
{
-                                try (OutputStream outputStream = new BufferedOutputStream(out))
{
-                                    outputStream.write(resultRepresentation.getBytes(StandardCharsets.UTF_8));
-                                }
+            final String resultRepresentation = JsonUtils.getResultRepresentation(resultHolder.get());
+            switch (destination) {
+                case DESTINATION_ATTRIBUTE:
+                    jsonPathResults.put(jsonPathAttrKey, resultRepresentation);
+                case DESTINATION_CONTENT:
+                    flowFile = processSession.write(flowFile, new OutputStreamCallback()
{
+                        @Override
+                        public void process(final OutputStream out) throws IOException {
+                            try (OutputStream outputStream = new BufferedOutputStream(out))
{
+                                outputStream.write(resultRepresentation.getBytes(StandardCharsets.UTF_8));
                             }
-                        });
-                        break;
-                }
+                        }
+                    });
+                    break;
             }
-            flowFile = processSession.putAllAttributes(flowFile, jsonPathResults);
-            processSession.transfer(flowFile, REL_MATCH);
         }
+        flowFile = processSession.putAllAttributes(flowFile, jsonPathResults);
+        processSession.transfer(flowFile, REL_MATCH);
     }
 
 }


Mime
View raw message