nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbe...@apache.org
Subject nifi git commit: NIFI-2925: When swapping in FlowFiles, do not assume that its Resource Claim is 'in use' but instead look up the canonical representation of the resource claim
Date Mon, 31 Oct 2016 14:00:00 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 7fd2c42b1 -> e1f0ba5a4


NIFI-2925: When swapping in FlowFiles, do not assume that its Resource Claim is 'in use' but
instead look up the canonical representation of the resource claim

This closes #1150.

Signed-off-by: Bryan Bende <bbende@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e1f0ba5a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e1f0ba5a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e1f0ba5a

Branch: refs/heads/master
Commit: e1f0ba5a43811561d2dce09914ca87db4a3e1738
Parents: 7fd2c42
Author: Mark Payne <markap14@hotmail.com>
Authored: Wed Oct 19 16:48:55 2016 -0400
Committer: Bryan Bende <bbende@apache.org>
Committed: Mon Oct 31 09:59:06 2016 -0400

----------------------------------------------------------------------
 .../repository/claim/ResourceClaimManager.java  | 14 ++++-
 .../nifi/controller/FileSystemSwapManager.java  |  9 +++-
 .../apache/nifi/controller/FlowController.java  | 10 ++--
 .../repository/FileSystemRepository.java        |  5 +-
 .../repository/VolatileContentRepository.java   |  2 +-
 .../WriteAheadFlowFileRepository.java           |  2 +-
 .../claim/StandardResourceClaimManager.java     | 54 +++++++++++++++-----
 .../controller/TestFileSystemSwapManager.java   |  7 ++-
 .../repository/TestStandardProcessSession.java  | 20 ++++----
 .../TestVolatileContentRepository.java          |  2 +-
 .../TestWriteAheadFlowFileRepository.java       |  4 +-
 .../claim/TestStandardResourceClaimManager.java |  2 +-
 12 files changed, 93 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
index 4fe523e..a85ddc4 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
@@ -32,9 +32,21 @@ public interface ResourceClaimManager {
      * @param container of claim
      * @param section of claim
      * @param lossTolerant of claim
+     * @param writable whether or not the claim should be made writable
      * @return new claim
      */
-    ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant);
+    ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant,
boolean writable);
+
+    /**
+     * Returns the Resource Claim with the given id, container, and section, if one exists,
<code>null</code> otherwise
+     *
+     * @param id of claim
+     * @param container of claim
+     * @param section of claim
+     * @return the existing resource claim or <code>null</code> if none exists
+     */
+    ResourceClaim getResourceClaim(String container, String section, String id);
+
 
     /**
      * @param claim to obtain reference count for

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 3c4610f..350cceb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -504,7 +504,14 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                         lossTolerant = false;
                     }
 
-                    resourceClaim = claimManager.newResourceClaim(container, section, claimId,
lossTolerant);
+                    resourceClaim = claimManager.getResourceClaim(container, section, claimId);
+                    if (resourceClaim == null) {
+                        logger.error("Swap file indicates that FlowFile was referencing Resource
Claim at container={}, section={}, claimId={}, "
+                            + "but this Resource Claim cannot be found! Will create a temporary
Resource Claim, but this may affect the framework's "
+                            + "ability to properly clean up this resource", container, section,
claimId);
+                        resourceClaim = claimManager.newResourceClaim(container, section,
claimId, lossTolerant, true);
+                    }
+
                     final StandardContentClaim claim = new StandardContentClaim(resourceClaim,
resourceOffset);
                     claim.setLength(resourceLength);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index b42f3ae..89a4379 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -3553,7 +3553,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
                     return null;
                 }
 
-                final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container,
section, identifier, false);
+                final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container,
section, identifier, false, false);
                 return new StandardContentClaim(resourceClaim, offset == null ? 0L : offset.longValue());
             }
 
@@ -3579,7 +3579,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
             }
 
             final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(),
provEvent.getPreviousContentClaimSection(),
-                    provEvent.getPreviousContentClaimIdentifier(), false);
+                provEvent.getPreviousContentClaimIdentifier(), false, false);
             claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset());
             offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset();
             size = provEvent.getPreviousFileSize();
@@ -3589,7 +3589,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
             }
 
             final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getContentClaimContainer(),
provEvent.getContentClaimSection(),
-                    provEvent.getContentClaimIdentifier(), false);
+                provEvent.getContentClaimIdentifier(), false, false);
 
             claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset());
             offset = provEvent.getContentClaimOffset() == null ? 0L : provEvent.getContentClaimOffset();
@@ -3682,7 +3682,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
         }
 
         try {
-            final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer,
contentClaimSection, contentClaimId, false);
+            final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer,
contentClaimSection, contentClaimId, false, false);
             final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset());
 
             if (!contentRepository.isAccessible(contentClaim)) {
@@ -3763,7 +3763,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider,
R
 
         // Create the ContentClaim
         final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
-                event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(),
false);
+            event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(),
false, false);
 
         // Increment Claimant Count, since we will now be referencing the Content Claim
         resourceClaimManager.incrementClaimantCount(resourceClaim);

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 2960091..e45852a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -439,7 +439,7 @@ public class FileSystemRepository implements ContentRepository {
         final String id = idPath.toFile().getName();
         final String sectionName = sectionPath.toFile().getName();
 
-        final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName,
sectionName, id, false);
+        final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName,
sectionName, id, false, false);
         if (resourceClaimManager.getClaimantCount(resourceClaim) == 0) {
             removeIncompleteContent(fileToRemove);
         }
@@ -537,7 +537,7 @@ public class FileSystemRepository implements ContentRepository {
             final String section = String.valueOf(modulatedSectionIndex);
             final String claimId = System.currentTimeMillis() + "-" + currentIndex;
 
-            resourceClaim = resourceClaimManager.newResourceClaim(containerName, section,
claimId, lossTolerant);
+            resourceClaim = resourceClaimManager.newResourceClaim(containerName, section,
claimId, lossTolerant, true);
             resourceOffset = 0L;
             LOG.debug("Creating new Resource Claim {}", resourceClaim);
 
@@ -949,6 +949,7 @@ public class FileSystemRepository implements ContentRepository {
                         LOG.debug("Claim length less than max; Adding {} back to Writable
Claim Queue", this);
                     } else {
                         writableClaimStreams.remove(scc.getResourceClaim());
+                        resourceClaimManager.freeze(scc.getResourceClaim());
 
                         bcos.close();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index 95b503b..f7ff7b8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -217,7 +217,7 @@ public class VolatileContentRepository implements ContentRepository {
 
     private ContentClaim createLossTolerant() {
         final long id = idGenerator.getAndIncrement();
-        final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME,
"section", String.valueOf(id), true);
+        final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME,
"section", String.valueOf(id), true, false);
         final ContentClaim claim = new StandardContentClaim(resourceClaim, 0L);
         final ContentBlock contentBlock = new ContentBlock(claim, repoSize);
         claimManager.incrementClaimantCount(resourceClaim, true);

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 36d592c..9c2a7d8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -826,7 +826,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository,
SyncLis
                     lossTolerant = false;
                 }
 
-                final ResourceClaim resourceClaim = claimManager.newResourceClaim(container,
section, claimId, lossTolerant);
+                final ResourceClaim resourceClaim = claimManager.newResourceClaim(container,
section, claimId, lossTolerant, false);
                 final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim,
resourceOffset);
                 contentClaim.setLength(resourceLength);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
index 9cb0fa1..be0e8b8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
@@ -29,14 +29,25 @@ import org.slf4j.LoggerFactory;
 
 public class StandardResourceClaimManager implements ResourceClaimManager {
 
-    private static final ConcurrentMap<ResourceClaim, AtomicInteger> claimantCounts
= new ConcurrentHashMap<>();
+    private static final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts =
new ConcurrentHashMap<>();
     private static final Logger logger = LoggerFactory.getLogger(StandardResourceClaimManager.class);
 
     private static final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000);
 
     @Override
-    public ResourceClaim newResourceClaim(final String container, final String section, final
String id, final boolean lossTolerant) {
-        return new StandardResourceClaim(this, container, section, id, lossTolerant);
+    public ResourceClaim newResourceClaim(final String container, final String section, final
String id, final boolean lossTolerant, final boolean writable) {
+        final StandardResourceClaim claim = new StandardResourceClaim(this, container, section,
id, lossTolerant);
+        if (!writable) {
+            claim.freeze();
+        }
+        return claim;
+    }
+
+    @Override
+    public ResourceClaim getResourceClaim(final String container, final String section, final
String id) {
+        final ResourceClaim tempClaim = new StandardResourceClaim(this, container, section,
id, false);
+        final ClaimCount count = claimantCounts.get(tempClaim);
+        return (count == null) ? null : count.getClaim();
     }
 
     private static AtomicInteger getCounter(final ResourceClaim claim) {
@@ -44,14 +55,14 @@ public class StandardResourceClaimManager implements ResourceClaimManager
{
             return null;
         }
 
-        AtomicInteger counter = claimantCounts.get(claim);
+        ClaimCount counter = claimantCounts.get(claim);
         if (counter != null) {
-            return counter;
+            return counter.getCount();
         }
 
-        counter = new AtomicInteger(0);
-        final AtomicInteger existingCounter = claimantCounts.putIfAbsent(claim, counter);
-        return existingCounter == null ? counter : existingCounter;
+        counter = new ClaimCount(claim, new AtomicInteger(0));
+        final ClaimCount existingCounter = claimantCounts.putIfAbsent(claim, counter);
+        return existingCounter == null ? counter.getCount() : existingCounter.getCount();
     }
 
     @Override
@@ -61,8 +72,8 @@ public class StandardResourceClaimManager implements ResourceClaimManager
{
         }
 
         synchronized (claim) {
-            final AtomicInteger counter = claimantCounts.get(claim);
-            return counter == null ? 0 : counter.get();
+            final ClaimCount counter = claimantCounts.get(claim);
+            return counter == null ? 0 : counter.getCount().get();
         }
     }
 
@@ -73,13 +84,13 @@ public class StandardResourceClaimManager implements ResourceClaimManager
{
         }
 
         synchronized (claim) {
-            final AtomicInteger counter = claimantCounts.get(claim);
+            final ClaimCount counter = claimantCounts.get(claim);
             if (counter == null) {
                 logger.warn("Decrementing claimant count for {} but claimant count is not
known. Returning -1", claim);
                 return -1;
             }
 
-            final int newClaimantCount = counter.decrementAndGet();
+            final int newClaimantCount = counter.getCount().decrementAndGet();
             if (newClaimantCount < 0) {
                 logger.error("Decremented claimant count for {} to {}", claim, newClaimantCount);
             } else {
@@ -178,4 +189,23 @@ public class StandardResourceClaimManager implements ResourceClaimManager
{
 
         ((StandardResourceClaim) claim).freeze();
     }
+
+
+    private static final class ClaimCount {
+        private final ResourceClaim claim;
+        private final AtomicInteger count;
+
+        public ClaimCount(final ResourceClaim claim, final AtomicInteger count) {
+            this.claim = claim;
+            this.count = count;
+        }
+
+        public AtomicInteger getCount() {
+            return count;
+        }
+
+        public ResourceClaim getClaim() {
+            return claim;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 6c52def..97226b2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -115,7 +115,12 @@ public class TestFileSystemSwapManager {
     public class NopResourceClaimManager implements ResourceClaimManager {
 
         @Override
-        public ResourceClaim newResourceClaim(String container, String section, String id,
boolean lossTolerant) {
+        public ResourceClaim newResourceClaim(String container, String section, String id,
boolean lossTolerant, boolean writable) {
+            return null;
+        }
+
+        @Override
+        public ResourceClaim getResourceClaim(String container, String section, String id)
{
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 8d398aa..a286024 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -817,7 +817,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
                 .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
                 .entryDate(System.currentTimeMillis())
-                .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x",
"x", "0", true), 0L))
+            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x",
"x", "0", true, false), 0L))
                 .size(1L)
                 .build();
         flowFileQueue.put(flowFileRecord);
@@ -964,7 +964,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
                 .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
                 .entryDate(System.currentTimeMillis())
-                .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x",
"x", "0", true), 0L))
+            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x",
"x", "0", true, false), 0L))
                 .size(1L)
                 .build();
         flowFileQueue.put(flowFileRecord);
@@ -988,7 +988,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
                 .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
                 .entryDate(System.currentTimeMillis())
-                .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x",
"x", "0", true), 0L))
+            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x",
"x", "0", true, false), 0L))
                 .build();
         flowFileQueue.put(flowFileRecord);
 
@@ -1004,7 +1004,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
                 .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
                 .entryDate(System.currentTimeMillis())
-                .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x",
"x", "0", true), 0L))
+            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x",
"x", "0", true, false), 0L))
                 .contentClaimOffset(1000L)
                 .size(1000L)
                 .build();
@@ -1029,7 +1029,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
                 .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
                 .entryDate(System.currentTimeMillis())
-                .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x",
"x", "0", true), 0L))
+            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x",
"x", "0", true, false), 0L))
                 .build();
 
         flowFileQueue.put(flowFileRecord);
@@ -1046,7 +1046,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
                 .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
                 .entryDate(System.currentTimeMillis())
-                .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x",
"x", "0", true), 0L))
+            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x",
"x", "0", true, false), 0L))
                 .contentClaimOffset(1000L).size(1L).build();
         flowFileQueue.put(flowFileRecord2);
 
@@ -1113,7 +1113,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
                 .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
                 .entryDate(System.currentTimeMillis())
-                .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x",
"x", "0", true), 0L))
+            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x",
"x", "0", true, false), 0L))
                 .contentClaimOffset(0L).size(0L).build();
         flowFileQueue.put(flowFileRecord);
 
@@ -1150,7 +1150,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
                 .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
                 .entryDate(System.currentTimeMillis())
-                .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x",
"x", "0", true), 0L))
+            .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x",
"x", "0", true, false), 0L))
                 .contentClaimOffset(0L).size(0L).build();
         flowFileQueue.put(flowFileRecord);
 
@@ -1477,7 +1477,7 @@ public class TestStandardProcessSession {
             final Set<ContentClaim> claims = new HashSet<>();
 
             for (long i = 0; i < idGenerator.get(); i++) {
-                final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim("container",
"section", String.valueOf(i), false);
+                final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim("container",
"section", String.valueOf(i), false, false);
                 final ContentClaim contentClaim = new StandardContentClaim(resourceClaim,
0L);
                 if (getClaimantCount(contentClaim) > 0) {
                     claims.add(contentClaim);
@@ -1489,7 +1489,7 @@ public class TestStandardProcessSession {
 
         @Override
         public ContentClaim create(boolean lossTolerant) throws IOException {
-            final ResourceClaim resourceClaim = claimManager.newResourceClaim("container",
"section", String.valueOf(idGenerator.getAndIncrement()), false);
+            final ResourceClaim resourceClaim = claimManager.newResourceClaim("container",
"section", String.valueOf(idGenerator.getAndIncrement()), false, false);
             final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
 
             claimantCounts.put(contentClaim, new AtomicInteger(1));

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
index feed31a..cebe91b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
@@ -83,7 +83,7 @@ public class TestVolatileContentRepository {
 
         final ContentRepository mockRepo = Mockito.mock(ContentRepository.class);
         contentRepo.setBackupRepository(mockRepo);
-        final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section",
"1000", true);
+        final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section",
"1000", true, false);
         final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
         Mockito.when(mockRepo.create(Matchers.anyBoolean())).thenReturn(contentClaim);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 674f78f..b2ea0b9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -87,10 +87,10 @@ public class TestWriteAheadFlowFileRepository {
         when(connection.getFlowFileQueue()).thenReturn(queue);
         queueProvider.addConnection(connection);
 
-        final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section",
"1", false);
+        final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section",
"1", false, false);
         final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 0L);
 
-        final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section",
"2", false);
+        final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section",
"2", false, false);
         final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 0L);
 
         // Create a flowfile repo, update it once with a FlowFile that points to one resource
claim. Then,

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1f0ba5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
index d29105a..867810e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
@@ -52,7 +52,7 @@ public class TestStandardResourceClaimManager {
             }
         };
 
-        final ResourceClaim resourceClaim = manager.newResourceClaim("container", "section",
"id", false);
+        final ResourceClaim resourceClaim = manager.newResourceClaim("container", "section",
"id", false, false);
         assertEquals(1, manager.incrementClaimantCount(resourceClaim)); // increment claimant
count to 1.
 
         assertEquals(1, manager.getClaimantCount(resourceClaim));


Mime
View raw message