nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject nifi git commit: NIFI-1316 adding option to DetectDuplicate to not cache the entry identifier
Date Tue, 12 Jan 2016 04:12:56 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 0c68e2c3a -> 6b54753db


NIFI-1316 adding option to DetectDuplicate to not cache the entry identifier

Signed-off-by: Aldrin Piri <aldrin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6b54753d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6b54753d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6b54753d

Branch: refs/heads/master
Commit: 6b54753dbb9bf6b2694a0cee7ac485fdcc8c3d01
Parents: 0c68e2c
Author: jpercivall <joepercivall@yahoo.com>
Authored: Mon Dec 21 16:05:13 2015 -0500
Committer: Aldrin Piri <aldrin@apache.org>
Committed: Mon Jan 11 22:59:24 2016 -0500

----------------------------------------------------------------------
 .../processors/standard/DetectDuplicate.java    | 33 ++++++++-
 .../standard/TestDetectDuplicate.java           | 78 +++++++++++++++++++-
 2 files changed, 104 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6b54753d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java
index 39dc725..71195d9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java
@@ -56,7 +56,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 @Tags({"hash", "dupe", "duplicate", "dedupe"})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Caches a value, computed from FlowFile attributes, for each incoming
FlowFile and determines if the cached value has already been seen. "
-        + "If so, routes the FlowFile to 'duplicate' with an attribute named 'original.identifier'
that specifies the original FlowFile's"
+        + "If so, routes the FlowFile to 'duplicate' with an attribute named 'original.identifier'
that specifies the original FlowFile's "
         + "\"description\", which is specified in the <FlowFile Description> property.
If the FlowFile is not determined to be a duplicate, the Processor "
         + "routes the FlowFile to 'non-duplicate'")
 @WritesAttribute(attribute = "original.flowfile.description", description = "All FlowFiles
routed to the duplicate relationship will have "
@@ -101,6 +101,16 @@ public class DetectDuplicate extends AbstractProcessor {
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor CACHE_IDENTIFIER = new PropertyDescriptor.Builder()
+            .name("Cache The Entry Identifier")
+            .description("When true this cause the processor to check for duplicates and
cache the Entry Identifier. When false, "
+                    + "the processor would only check for duplicates and not cache the Entry
Identifier, requiring another "
+                    + "processor to add identifiers to the distributed cache.")
+            .required(false)
+            .allowableValues("true","false")
+            .defaultValue("true")
+            .build();
+
     public static final Relationship REL_DUPLICATE = new Relationship.Builder()
             .name("duplicate")
             .description("If a FlowFile has been detected to be a duplicate, it will be routed
to this relationship")
@@ -134,6 +144,7 @@ public class DetectDuplicate extends AbstractProcessor {
         descriptors.add(FLOWFILE_DESCRIPTION);
         descriptors.add(AGE_OFF_DURATION);
         descriptors.add(DISTRIBUTED_CACHE_SERVICE);
+        descriptors.add(CACHE_IDENTIFIER);
         return descriptors;
     }
 
@@ -164,14 +175,28 @@ public class DetectDuplicate extends AbstractProcessor {
         try {
             final String flowFileDescription = context.getProperty(FLOWFILE_DESCRIPTION).evaluateAttributeExpressions(flowFile).getValue();
             final CacheValue cacheValue = new CacheValue(flowFileDescription, now);
-            final CacheValue originalCacheValue = cache.getAndPutIfAbsent(cacheKey, cacheValue,
keySerializer, valueSerializer, valueDeserializer);
+            final CacheValue originalCacheValue;
+
+            final boolean shouldCacheIdentifier = context.getProperty(CACHE_IDENTIFIER).asBoolean();
+            if (shouldCacheIdentifier) {
+                originalCacheValue = cache.getAndPutIfAbsent(cacheKey, cacheValue, keySerializer,
valueSerializer, valueDeserializer);
+            } else {
+                originalCacheValue = cache.get(cacheKey, keySerializer, valueDeserializer);
+            }
+
             boolean duplicate = originalCacheValue != null;
             if (duplicate && durationMS != null && (now >= originalCacheValue.getEntryTimeMS()
+ durationMS)) {
                 boolean status = cache.remove(cacheKey, keySerializer);
                 logger.debug("Removal of expired cached entry with key {} returned {}", new
Object[]{cacheKey, status});
-                // this should typically result in duplicate being false...but, better safe
than sorry
-                duplicate = !cache.putIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer);
+
+                // both should typically result in duplicate being false...but, better safe
than sorry
+                if (shouldCacheIdentifier) {
+                    duplicate = !cache.putIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer);
+                } else {
+                    duplicate = cache.containsKey(cacheKey, keySerializer);
+                }
             }
+
             if (duplicate) {
                 session.getProvenanceReporter().route(flowFile, REL_DUPLICATE, "Duplicate
of: " + ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME);
                 String originalFlowFileDescription = originalCacheValue.getDescription();

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b54753d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
index 12a5cd4..54e6a29 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
@@ -73,7 +73,6 @@ public class TestDetectDuplicate {
         runner.run();
         runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1);
         runner.clearTransferState();
-        client.exists = true;
         runner.enqueue(new byte[]{}, props);
         runner.run();
         runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_DUPLICATE, 1);
@@ -101,7 +100,6 @@ public class TestDetectDuplicate {
         runner.run();
         runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1);
         runner.clearTransferState();
-        client.exists = true;
         Thread.sleep(3000);
         runner.enqueue(new byte[]{}, props);
         runner.run();
@@ -120,6 +118,72 @@ public class TestDetectDuplicate {
         return client;
     }
 
+    @Test
+    public void testDuplicateNoCache() throws InitializationException {
+        final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
+        final DistributedMapCacheClientImpl client = createClient();
+        final Map<String, String> clientProperties = new HashMap<>();
+        clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
+        runner.addControllerService("client", client, clientProperties);
+        runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client");
+        runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file");
+        runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "48 hours");
+        runner.setProperty(DetectDuplicate.CACHE_IDENTIFIER, "false");
+        final Map<String, String> props = new HashMap<>();
+        props.put("hash.value", "1000");
+        runner.enqueue(new byte[]{}, props);
+        runner.enableControllerService(client);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1);
+        runner.clearTransferState();
+
+        runner.setProperty(DetectDuplicate.CACHE_IDENTIFIER, "true");
+        runner.enqueue(new byte[]{}, props);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1);
+        runner.assertTransferCount(DetectDuplicate.REL_DUPLICATE, 0);
+        runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0);
+        runner.clearTransferState();
+
+        runner.enqueue(new byte[]{}, props);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_DUPLICATE, 1);
+        runner.assertTransferCount(DetectDuplicate.REL_NON_DUPLICATE, 0);
+        runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0);
+    }
+
+    @Test
+    public void testDuplicateNoCacheWithAgeOff() throws InitializationException, InterruptedException
{
+
+        final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
+        final DistributedMapCacheClientImpl client = createClient();
+        final Map<String, String> clientProperties = new HashMap<>();
+        clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
+        runner.addControllerService("client", client, clientProperties);
+        runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client");
+        runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file");
+        runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "2 secs");
+        runner.enableControllerService(client);
+
+        final Map<String, String> props = new HashMap<>();
+        props.put("hash.value", "1000");
+        runner.enqueue(new byte[]{}, props);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1);
+
+        runner.clearTransferState();
+        Thread.sleep(3000);
+
+        runner.setProperty(DetectDuplicate.CACHE_IDENTIFIER, "false");
+        runner.enqueue(new byte[]{}, props);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1);
+        runner.assertTransferCount(DetectDuplicate.REL_DUPLICATE, 0);
+        runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0);
+    }
+
     static final class DistributedMapCacheClientImpl extends AbstractControllerService implements
DistributedMapCacheClient {
 
         boolean exists = false;
@@ -150,6 +214,7 @@ public class TestDetectDuplicate {
             }
 
             cacheValue = value;
+            exists = true;
             return true;
         }
 
@@ -160,6 +225,7 @@ public class TestDetectDuplicate {
                 return (V) cacheValue;
             }
             cacheValue = value;
+            exists = true;
             return null;
         }
 
@@ -170,7 +236,11 @@ public class TestDetectDuplicate {
 
         @Override
         public <K, V> V get(final K key, final Serializer<K> keySerializer, final
Deserializer<V> valueDeserializer) throws IOException {
-            return null;
+            if (exists) {
+                return (V) cacheValue;
+            } else {
+                return null;
+            }
         }
 
         @Override
@@ -181,6 +251,8 @@ public class TestDetectDuplicate {
 
         @Override
         public <K, V> void put(final K key, final V value, final Serializer<K>
keySerializer, final Serializer<V> valueSerializer) throws IOException {
+            cacheValue = value;
+            exists = true;
         }
     }
 


Mime
View raw message