nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [46/50] [abbrv] incubator-nifi git commit: Adjusting handling of map to cache data items on an instance basis.
Date Mon, 02 Mar 2015 04:04:25 GMT
Adjusting handling of map to cache data items on an instance basis.


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/973b4933
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/973b4933
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/973b4933

Branch: refs/heads/develop
Commit: 973b493386c71017f9baa233d4ec178251e64f53
Parents: 484687a
Author: Aldrin Piri <aldrinpiri@gmail.com>
Authored: Sun Mar 1 16:31:32 2015 -0500
Committer: Aldrin Piri <aldrinpiri@gmail.com>
Committed: Sun Mar 1 16:31:32 2015 -0500

----------------------------------------------------------------------
 .../processors/standard/EvaluateJsonPath.java   | 14 ++---
 .../nifi/processors/standard/SplitJson.java     | 55 ++++++++++----------
 2 files changed, 34 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/973b4933/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
index 81c9bbe..64f6e0d 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
@@ -40,7 +40,6 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.util.ObjectHolder;
-import org.apache.nifi.util.Tuple;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -96,7 +95,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> properties;
 
-    private ConcurrentMap<String, Tuple<String, JsonPath>> cachedJsonPathMap
= new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, JsonPath> cachedJsonPathMap = new ConcurrentHashMap<>();
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -153,13 +152,12 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
                 .addValidator(new JsonPathValidator() {
                     @Override
                     public void cacheComputedValue(String subject, String input, JsonPath
computedJsonPath) {
-                        cachedJsonPathMap.put(subject, new Tuple<>(input, computedJsonPath));
-
+                        cachedJsonPathMap.put(input, computedJsonPath);
                     }
 
                     @Override
                     public boolean isStale(String subject, String input) {
-                        return cachedJsonPathMap.get(subject) == null;
+                        return cachedJsonPathMap.get(input) == null;
                     }
                 })
                 .required(false)
@@ -171,7 +169,9 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
     public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String
newValue) {
         if (descriptor.isDynamic()) {
             if (!StringUtils.equals(oldValue, newValue)) {
-                cachedJsonPathMap.remove(descriptor.getName());
+                if (oldValue != null) {
+                    cachedJsonPathMap.remove(oldValue);
+                }
             }
         }
     }
@@ -185,7 +185,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
     public void onRemoved(ProcessContext processContext) {
         for (PropertyDescriptor propertyDescriptor : getPropertyDescriptors()) {
             if (propertyDescriptor.isDynamic()) {
-                cachedJsonPathMap.remove(propertyDescriptor.getName());
+                cachedJsonPathMap.remove(processContext.getProperty(propertyDescriptor).getValue());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/973b4933/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
index 7bb8c4e..5a193a1 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
@@ -26,8 +26,9 @@ import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.processor.ProcessContext;
@@ -35,13 +36,13 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 @EventDriven
 @SideEffectFree
@@ -56,18 +57,8 @@ public class SplitJson extends AbstractJsonPathProcessor {
     public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder()
             .name("JsonPath Expression")
             .description("A JsonPath expression that indicates the array element to split
into JSON/scalar fragments.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .required(true)
-            .addValidator(new JsonPathValidator() {
-                @Override
-                public void cacheComputedValue(String subject, String input, JsonPath computedJson)
{
-                    JSON_PATH_MAP.put(input, computedJson);
-                }
-
-                @Override
-                public boolean isStale(String subject, String input) {
-                    return JSON_PATH_MAP.get(input) == null;
-                }
-            })
             .build();
 
     public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The
original FlowFile that was split into segments. If the FlowFile fails processing, nothing
will be sent to this relationship").build();
@@ -77,7 +68,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
     private List<PropertyDescriptor> properties;
     private Set<Relationship> relationships;
 
-    private static final ConcurrentMap<String, JsonPath> JSON_PATH_MAP = new ConcurrentHashMap();
+    private final AtomicReference<JsonPath> JSON_PATH_REF = new AtomicReference<>();
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -106,21 +97,30 @@ public class SplitJson extends AbstractJsonPathProcessor {
     public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String
newValue) {
         if (descriptor.equals(ARRAY_JSON_PATH_EXPRESSION)) {
             if (!StringUtils.equals(oldValue, newValue)) {
-                // clear the cached item
-                JSON_PATH_MAP.remove(oldValue);
+                if (oldValue != null) {
+                    // clear the cached item
+                    JSON_PATH_REF.set(null);
+                }
             }
         }
     }
 
-    /**
-     * Provides cleanup of the map for any JsonPath values that may have been created.  This
will remove common values
-     * shared between multiple instances, but will be regenerated when the next validation
cycle occurs as a result of
-     * isStale()
-     */
-    @OnRemoved
-    public void onRemoved(ProcessContext processContext) {
-        String jsonPathExpression = processContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue();
-        JSON_PATH_MAP.remove(jsonPathExpression);
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext)
{
+        JsonPathValidator validator = new JsonPathValidator() {
+            @Override
+            public void cacheComputedValue(String subject, String input, JsonPath computedJson)
{
+                JSON_PATH_REF.set(computedJson);
+            }
+
+            @Override
+            public boolean isStale(String subject, String input) {
+                return JSON_PATH_REF.get() == null;
+            }
+        };
+
+        String value = validationContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue();
+        return Collections.singleton(validator.validate(ARRAY_JSON_PATH_EXPRESSION.getName(),
value, validationContext));
     }
 
     @Override
@@ -141,8 +141,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
             return;
         }
 
-        String jsonPathExpression = processContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue();
-        final JsonPath jsonPath = JSON_PATH_MAP.get(jsonPathExpression);
+        final JsonPath jsonPath = JSON_PATH_REF.get();
 
         final List<FlowFile> segments = new ArrayList<>();
 


Mime
View raw message