nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [1/5] incubator-nifi git commit: NIFI-387: If possible don't use ContentRepository.importFrom but just copy stream directly in StandardProcessSession
Date Sun, 01 Mar 2015 20:14:21 GMT
Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 50744bfdc -> 83b33c805


NIFI-387: If possible don't use ContentRepository.importFrom but just copy stream directly
in StandardProcessSession


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/739f0c25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/739f0c25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/739f0c25

Branch: refs/heads/develop
Commit: 739f0c25e2c885bcfa6eef81a9e4896e618ec0fb
Parents: 50744bf
Author: Mark Payne <markap14@hotmail.com>
Authored: Thu Feb 26 09:04:05 2015 -0500
Committer: Mark Payne <markap14@hotmail.com>
Committed: Thu Feb 26 09:04:05 2015 -0500

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java      | 46 +++++++++++---------
 1 file changed, 26 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739f0c25/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index dcb461c..8d2e456 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2351,35 +2351,41 @@ public final class StandardProcessSession implements ProcessSession,
ProvenanceE
     public FlowFile importFrom(final InputStream source, final FlowFile destination) {
         validateRecordState(destination);
         final StandardRepositoryRecord record = records.get(destination);
-        final ContentClaim newClaim;
+        ContentClaim newClaim = null;
         long claimOffset = 0L;
 
-        final boolean appendToClaim = isMergeContent();
-        if (appendToClaim) {
-            enforceCurrentWriteClaimState();
-            newClaim = currentWriteClaim;
-            claimOffset = currentWriteClaimSize;
-        } else {
-            try {
-                newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
-                claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim,
destination);
-            } catch (final IOException e) {
-                throw new FlowFileAccessException("Unable to create ContentClaim due to "
+ e.toString(), e);
-            }
-        }
-
         final long newSize;
+        final boolean appendToClaim = isMergeContent();
         try {
-            final boolean append = isMergeContent();
-            newSize = context.getContentRepository().importFrom(source, newClaim, append);
-            bytesWritten.increment(newSize);
-            currentWriteClaimSize += newSize;
+            if (appendToClaim) {
+                enforceCurrentWriteClaimState();
+                newClaim = currentWriteClaim;
+                claimOffset = currentWriteClaimSize;
+                
+                final long bytesCopied = StreamUtils.copy(source, currentWriteClaimStream);
+                bytesWritten.increment(bytesCopied);
+                currentWriteClaimSize += bytesCopied;
+                newSize = bytesCopied;
+            } else {
+                try {
+                    newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+                    claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim,
destination);
+                    
+                    newSize = context.getContentRepository().importFrom(source, newClaim,
appendToClaim);
+                    bytesWritten.increment(newSize);
+                    currentWriteClaimSize += newSize;
+                } catch (final IOException e) {
+                    throw new FlowFileAccessException("Unable to create ContentClaim due
to " + e.toString(), e);
+                }
+            }
         } catch (final Throwable t) {
             if (appendToClaim) {
                 resetWriteClaims();
             }
 
-            destroyContent(newClaim);
+            if ( newClaim != null ) {
+                destroyContent(newClaim);
+            }
             throw new FlowFileAccessException("Failed to import data from " + source + "
for " + destination + " due to " + t.toString(), t);
         }
 


Mime
View raw message