NIFI-387: If possible don't use ContentRepository.importFrom but just copy stream directly
in StandardProcessSession
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/739f0c25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/739f0c25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/739f0c25
Branch: refs/heads/NIFI-376
Commit: 739f0c25e2c885bcfa6eef81a9e4896e618ec0fb
Parents: 50744bf
Author: Mark Payne <markap14@hotmail.com>
Authored: Thu Feb 26 09:04:05 2015 -0500
Committer: Mark Payne <markap14@hotmail.com>
Committed: Thu Feb 26 09:04:05 2015 -0500
----------------------------------------------------------------------
.../repository/StandardProcessSession.java | 46 +++++++++++---------
1 file changed, 26 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739f0c25/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index dcb461c..8d2e456 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2351,35 +2351,41 @@ public final class StandardProcessSession implements ProcessSession,
ProvenanceE
public FlowFile importFrom(final InputStream source, final FlowFile destination) {
validateRecordState(destination);
final StandardRepositoryRecord record = records.get(destination);
- final ContentClaim newClaim;
+ ContentClaim newClaim = null;
long claimOffset = 0L;
- final boolean appendToClaim = isMergeContent();
- if (appendToClaim) {
- enforceCurrentWriteClaimState();
- newClaim = currentWriteClaim;
- claimOffset = currentWriteClaimSize;
- } else {
- try {
- newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
- claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim,
destination);
- } catch (final IOException e) {
- throw new FlowFileAccessException("Unable to create ContentClaim due to "
+ e.toString(), e);
- }
- }
-
final long newSize;
+ final boolean appendToClaim = isMergeContent();
try {
- final boolean append = isMergeContent();
- newSize = context.getContentRepository().importFrom(source, newClaim, append);
- bytesWritten.increment(newSize);
- currentWriteClaimSize += newSize;
+ if (appendToClaim) {
+ enforceCurrentWriteClaimState();
+ newClaim = currentWriteClaim;
+ claimOffset = currentWriteClaimSize;
+
+ final long bytesCopied = StreamUtils.copy(source, currentWriteClaimStream);
+ bytesWritten.increment(bytesCopied);
+ currentWriteClaimSize += bytesCopied;
+ newSize = bytesCopied;
+ } else {
+ try {
+ newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
+ claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim,
destination);
+
+ newSize = context.getContentRepository().importFrom(source, newClaim,
appendToClaim);
+ bytesWritten.increment(newSize);
+ currentWriteClaimSize += newSize;
+ } catch (final IOException e) {
+ throw new FlowFileAccessException("Unable to create ContentClaim due
to " + e.toString(), e);
+ }
+ }
} catch (final Throwable t) {
if (appendToClaim) {
resetWriteClaims();
}
- destroyContent(newClaim);
+ if ( newClaim != null ) {
+ destroyContent(newClaim);
+ }
throw new FlowFileAccessException("Failed to import data from " + source + "
for " + destination + " due to " + t.toString(), t);
}
|