nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ijokaruma...@apache.org
Subject [nifi] branch master updated: NIFI-6349 Fix to MergeRecords when handling fragments over multiple iterations
Date Mon, 10 Jun 2019 05:33:30 GMT
This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 99350b7  NIFI-6349 Fix to MergeRecords when handling fragments over multiple iterations
99350b7 is described below

commit 99350b761dda3458757563b633ed21e5524257d4
Author: Evan Reynolds <evan@usermind.com>
AuthorDate: Tue Jun 4 17:34:26 2019 -0700

    NIFI-6349 Fix to MergeRecords when handling fragments over multiple iterations
    
    Fixed fragment count attribute check
    
    This closes #3517.
    
    Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
---
 .../nifi/processors/standard/merge/RecordBin.java  | 30 ++++++------------
 .../standard/merge/RecordBinManager.java           | 11 +++++--
 .../nifi/processors/standard/TestMergeRecord.java  | 36 ++++++++++++++++++++++
 3 files changed, 53 insertions(+), 24 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
index a96e119..139b2a4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -203,6 +203,10 @@ public class RecordBin {
                 return false;
             }
 
+            if(thresholds.getFragmentCountAttribute().isPresent() && this.fragmentCount
== getMinimumRecordCount()) {
+                return true;
+            }
+
             int maxRecords = thresholds.getMaxRecords();
 
             if (recordCount >= maxRecords) {
@@ -213,22 +217,6 @@ public class RecordBin {
                 return true;
             }
 
-            Optional<String> fragmentCountAttribute = thresholds.getFragmentCountAttribute();
-            if(fragmentCountAttribute != null && fragmentCountAttribute.isPresent())
{
-                final Optional<String> fragmentCountValue = flowFiles.stream()
-                        .filter(ff -> ff.getAttribute(fragmentCountAttribute.get()) !=
null)
-                        .map(ff -> ff.getAttribute(fragmentCountAttribute.get()))
-                        .findFirst();
-                if (fragmentCountValue.isPresent()) {
-                    try {
-                        int expectedFragments = Integer.parseInt(fragmentCountValue.get());
-                        if (this.fragmentCount == expectedFragments)
-                            return true;
-                    } catch (NumberFormatException nfe) {
-                        this.logger.error(nfe.getMessage(), nfe);
-                    }
-                }
-            }
             return false;
         } finally {
             readLock.unlock();
@@ -349,14 +337,14 @@ public class RecordBin {
                         count = Integer.parseInt(countVal);
                     } catch (final NumberFormatException nfe) {
                         logger.error("Could not merge bin with {} FlowFiles because the '{}'
attribute had a value of '{}' for {} but expected a number",
-                            new Object[] {flowFiles.size(), countAttr.get(), countVal, flowFile});
+                                new Object[] {flowFiles.size(), countAttr.get(), countVal,
flowFile});
                         fail();
                         return;
                     }
 
                     if (expectedBinCount != null && count != expectedBinCount) {
                         logger.error("Could not merge bin with {} FlowFiles because the '{}'
attribute had a value of '{}' for {} but another FlowFile in the bin had a value of {}",
-                            new Object[] {flowFiles.size(), countAttr.get(), countVal, flowFile,
expectedBinCount});
+                                new Object[] {flowFiles.size(), countAttr.get(), countVal,
flowFile, expectedBinCount});
                         fail();
                         return;
                     }
@@ -366,15 +354,15 @@ public class RecordBin {
 
                 if (expectedBinCount == null) {
                     logger.error("Could not merge bin with {} FlowFiles because the '{}'
attribute was not present on any of the FlowFiles",
-                        new Object[] {flowFiles.size(), countAttr.get()});
+                            new Object[] {flowFiles.size(), countAttr.get()});
                     fail();
                     return;
                 }
 
                 if (expectedBinCount != flowFiles.size()) {
                     logger.error("Could not merge bin with {} FlowFiles because the '{}'
attribute had a value of '{}' but only {} of {} FlowFiles were encountered before this bin
was evicted "
-                        + "(due to to Max Bin Age being reached or due to the Maximum Number
of Bins being exceeded).",
-                        new Object[] {flowFiles.size(), countAttr.get(), expectedBinCount,
flowFiles.size(), expectedBinCount});
+                                    + "(due to to Max Bin Age being reached or due to the
Maximum Number of Bins being exceeded).",
+                            new Object[] {flowFiles.size(), countAttr.get(), expectedBinCount,
flowFiles.size(), expectedBinCount});
                     fail();
                     return;
                 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
index f0891a9..c271c2c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
@@ -27,6 +27,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processors.standard.MergeContent;
 import org.apache.nifi.processors.standard.MergeRecord;
 import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.util.StringUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -146,7 +147,8 @@ public class RecordBinManager {
         }
 
         // if we've reached this point then we couldn't fit it into any existing bins - gotta
make a new one
-        final RecordBin bin = new RecordBin(context, sessionFactory.createSession(), logger,
createThresholds());
+
+        final RecordBin bin = new RecordBin(context, sessionFactory.createSession(), logger,
createThresholds(flowFile));
         final boolean binAccepted = bin.offer(flowFile, reader, session, true);
         if (!binAccepted) {
             session.rollback();
@@ -179,8 +181,8 @@ public class RecordBinManager {
     }
 
 
-    private RecordBinThresholds createThresholds() {
-        final int minRecords = context.getProperty(MergeRecord.MIN_RECORDS).asInteger();
+    private RecordBinThresholds createThresholds(FlowFile flowfile) {
+        int minRecords = context.getProperty(MergeRecord.MIN_RECORDS).asInteger();
         final int maxRecords = context.getProperty(MergeRecord.MAX_RECORDS).asInteger();
         final long minBytes = context.getProperty(MergeRecord.MIN_SIZE).asDataSize(DataUnit.B).longValue();
 
@@ -195,6 +197,9 @@ public class RecordBinManager {
         final String mergeStrategy = context.getProperty(MergeRecord.MERGE_STRATEGY).getValue();
         if (MergeRecord.MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
             fragmentCountAttribute = MergeContent.FRAGMENT_COUNT_ATTRIBUTE;
+            if (!StringUtils.isEmpty(flowfile.getAttribute(fragmentCountAttribute))) {
+                minRecords = Integer.parseInt(flowfile.getAttribute(fragmentCountAttribute));
+            }
         } else {
             fragmentCountAttribute = null;
         }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index 3540b04..2f235df 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -206,6 +206,42 @@ public class TestMergeRecord {
             .count());
     }
 
+    @Test
+    public void testDefragmentOverMultipleCalls() {
+        runner.setProperty(MergeRecord.MERGE_STRATEGY, MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
+
+        final Map<String, String> attr1 = new HashMap<>();
+        attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+        attr1.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
+
+        runner.enqueue("Name, Age\nJohn, 35", attr1);
+        runner.run(2);
+
+        assertEquals("Fragment should remain in the incoming connection", 1, runner.getQueueSize().getObjectCount());
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+        runner.assertTransferCount(MergeRecord.REL_FAILURE, 0);
+
+        final Map<String, String> attr2 = new HashMap<>();
+        attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+        attr2.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1");
+
+        runner.enqueue("Name, Age\nJane, 34", attr2);
+        runner.run(1);
+
+        assertEquals("Fragments should merge", 0, runner.getQueueSize().getObjectCount());
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
+        runner.assertTransferCount(MergeRecord.REL_FAILURE, 0);
+
+        final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED);
+        assertEquals(1L, mffs.stream()
+                .filter(ff -> "2".equals(ff.getAttribute("record.count")))
+                .filter(ff -> "header\nJohn,35\nJane,34\n".equals(new String(ff.toByteArray())))
+                .count());
+    }
 
     @Test
     public void testDefragmentWithMultipleRecords() {


Mime
View raw message