nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject nifi git commit: NIFI-4562: This closes #2331. If an Exception is thrown when merging FlowFiles, ensure that we remove the 'bundle' flowfile before exiting the method
Date Fri, 08 Dec 2017 20:36:26 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 3b74d2dda -> 1fc1d38fd


NIFI-4562: This closes #2331. If an Exception is thrown when merging FlowFiles, ensure that
we remove the 'bundle' flowfile before exiting the method

Signed-off-by: joewitt <joewitt@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1fc1d38f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1fc1d38f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1fc1d38f

Branch: refs/heads/master
Commit: 1fc1d38fd8bec39d7358c7e04e024d563fc26492
Parents: 3b74d2d
Author: Mark Payne <markap14@hotmail.com>
Authored: Fri Dec 8 13:09:00 2017 -0500
Committer: joewitt <joewitt@apache.org>
Committed: Fri Dec 8 15:35:36 2017 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/MergeContent.java  | 399 ++++++++++---------
 1 file changed, 212 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1fc1d38f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 79a69a7..e78cd4f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -578,49 +578,54 @@ public class MergeContent extends BinFiles {
             final ProcessSession session = bin.getSession();
             FlowFile bundle = session.create(bin.getContents());
             final AtomicReference<String> bundleMimeTypeRef = new AtomicReference<>(null);
-            bundle = session.write(bundle, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream out) throws IOException {
-                    final byte[] header = getDelimiterContent(context, contents, HEADER);
-                    if (header != null) {
-                        out.write(header);
-                    }
+            try {
+                bundle = session.write(bundle, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws IOException {
+                        final byte[] header = getDelimiterContent(context, contents, HEADER);
+                        if (header != null) {
+                            out.write(header);
+                        }
 
-                    boolean isFirst = true;
-                    final Iterator<FlowFile> itr = contents.iterator();
-                    while (itr.hasNext()) {
-                        final FlowFile flowFile = itr.next();
-                        bin.getSession().read(flowFile, false, new InputStreamCallback()
{
-                            @Override
-                            public void process(final InputStream in) throws IOException
{
-                                StreamUtils.copy(in, out);
-                            }
-                        });
+                        boolean isFirst = true;
+                        final Iterator<FlowFile> itr = contents.iterator();
+                        while (itr.hasNext()) {
+                            final FlowFile flowFile = itr.next();
+                            bin.getSession().read(flowFile, false, new InputStreamCallback()
{
+                                @Override
+                                public void process(final InputStream in) throws IOException
{
+                                    StreamUtils.copy(in, out);
+                                }
+                            });
 
-                        if (itr.hasNext()) {
-                            final byte[] demarcator = getDelimiterContent(context, contents,
DEMARCATOR);
-                            if (demarcator != null) {
-                                out.write(demarcator);
+                            if (itr.hasNext()) {
+                                final byte[] demarcator = getDelimiterContent(context, contents,
DEMARCATOR);
+                                if (demarcator != null) {
+                                    out.write(demarcator);
+                                }
                             }
-                        }
 
-                        final String flowFileMimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
-                        if (isFirst) {
-                            bundleMimeTypeRef.set(flowFileMimeType);
-                            isFirst = false;
-                        } else {
-                            if (bundleMimeTypeRef.get() != null && !bundleMimeTypeRef.get().equals(flowFileMimeType))
{
-                                bundleMimeTypeRef.set(null);
+                            final String flowFileMimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
+                            if (isFirst) {
+                                bundleMimeTypeRef.set(flowFileMimeType);
+                                isFirst = false;
+                            } else {
+                                if (bundleMimeTypeRef.get() != null && !bundleMimeTypeRef.get().equals(flowFileMimeType))
{
+                                    bundleMimeTypeRef.set(null);
+                                }
                             }
                         }
-                    }
 
-                    final byte[] footer = getDelimiterContent(context, contents, FOOTER);
-                    if (footer != null) {
-                        out.write(footer);
+                        final byte[] footer = getDelimiterContent(context, contents, FOOTER);
+                        if (footer != null) {
+                            out.write(footer);
+                        }
                     }
-                }
-            });
+                });
+            } catch (final Exception e) {
+                session.remove(bundle);
+                throw e;
+            }
 
             session.getProvenanceReporter().join(contents, bundle);
             bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents));
@@ -719,48 +724,53 @@ public class MergeContent extends BinFiles {
             final boolean keepPath = context.getProperty(KEEP_PATH).asBoolean();
             FlowFile bundle = session.create(); // we don't pass the parents to the #create
method because the parents belong to different sessions
 
-            bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents)
+ ".tar");
-            bundle = session.write(bundle, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream rawOut) throws IOException {
-                    try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut);
+            try {
+                bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents)
+ ".tar");
+                bundle = session.write(bundle, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream rawOut) throws IOException {
+                        try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut);
                             final TarArchiveOutputStream out = new TarArchiveOutputStream(bufferedOut))
{
-                        out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
-                        for (final FlowFile flowFile : contents) {
-                            final String path = keepPath ? getPath(flowFile) : "";
-                            final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key());
-
-                            final TarArchiveEntry tarEntry = new TarArchiveEntry(entryName);
-                            tarEntry.setSize(flowFile.getSize());
-                            final String permissionsVal = flowFile.getAttribute(TAR_PERMISSIONS_ATTRIBUTE);
-                            if (permissionsVal != null) {
-                                try {
-                                    tarEntry.setMode(Integer.parseInt(permissionsVal));
-                                } catch (final Exception e) {
-                                    getLogger().debug("Attribute {} of {} is set to {}; expected
3 digits between 0-7, so ignoring",
-                                            new Object[]{TAR_PERMISSIONS_ATTRIBUTE, flowFile,
permissionsVal});
+                            out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
+                            for (final FlowFile flowFile : contents) {
+                                final String path = keepPath ? getPath(flowFile) : "";
+                                final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key());
+
+                                final TarArchiveEntry tarEntry = new TarArchiveEntry(entryName);
+                                tarEntry.setSize(flowFile.getSize());
+                                final String permissionsVal = flowFile.getAttribute(TAR_PERMISSIONS_ATTRIBUTE);
+                                if (permissionsVal != null) {
+                                    try {
+                                        tarEntry.setMode(Integer.parseInt(permissionsVal));
+                                    } catch (final Exception e) {
+                                        getLogger().debug("Attribute {} of {} is set to {};
expected 3 digits between 0-7, so ignoring",
+                                            new Object[] {TAR_PERMISSIONS_ATTRIBUTE, flowFile,
permissionsVal});
+                                    }
                                 }
-                            }
 
-                            final String modTime = context.getProperty(TAR_MODIFIED_TIME)
+                                final String modTime = context.getProperty(TAR_MODIFIED_TIME)
                                     .evaluateAttributeExpressions(flowFile).getValue();
-                            if (StringUtils.isNotBlank(modTime)) {
-                                try {
-                                    tarEntry.setModTime(Instant.parse(modTime).toEpochMilli());
-                                } catch (final Exception e) {
-                                    getLogger().debug("Attribute {} of {} is set to {}; expected
ISO8601 format, so ignoring",
-                                            new Object[]{TAR_MODIFIED_TIME, flowFile, modTime});
+                                if (StringUtils.isNotBlank(modTime)) {
+                                    try {
+                                        tarEntry.setModTime(Instant.parse(modTime).toEpochMilli());
+                                    } catch (final Exception e) {
+                                        getLogger().debug("Attribute {} of {} is set to {};
expected ISO8601 format, so ignoring",
+                                            new Object[] {TAR_MODIFIED_TIME, flowFile, modTime});
+                                    }
                                 }
-                            }
 
-                            out.putArchiveEntry(tarEntry);
+                                out.putArchiveEntry(tarEntry);
 
-                            bin.getSession().exportTo(flowFile, out);
-                            out.closeArchiveEntry();
+                                bin.getSession().exportTo(flowFile, out);
+                                out.closeArchiveEntry();
+                            }
                         }
                     }
-                }
-            });
+                });
+            } catch (final Exception e) {
+                session.remove(bundle);
+                throw e;
+            }
 
             bin.getSession().getProvenanceReporter().join(contents, bundle);
             return bundle;
@@ -794,35 +804,40 @@ public class MergeContent extends BinFiles {
 
             FlowFile bundle = session.create(contents);
 
-            bundle = session.write(bundle, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream rawOut) throws IOException {
-                    try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut))
{
-                        // we don't want the packager closing the stream. V1 creates a TAR
Output Stream, which then gets
-                        // closed, which in turn closes the underlying OutputStream, and
we want to protect ourselves against that.
-                        final OutputStream out = new NonCloseableOutputStream(bufferedOut);
-
-                        for (final FlowFile flowFile : contents) {
-                            bin.getSession().read(flowFile, false, new InputStreamCallback()
{
-                                @Override
-                                public void process(final InputStream rawIn) throws IOException
{
-                                    try (final InputStream in = new BufferedInputStream(rawIn))
{
-                                        final Map<String, String> attributes = new
HashMap<>(flowFile.getAttributes());
-
-                                        // for backward compatibility purposes, we add the
"legacy" NiFi attributes
-                                        attributes.put("nf.file.name", attributes.get(CoreAttributes.FILENAME.key()));
-                                        attributes.put("nf.file.path", attributes.get(CoreAttributes.PATH.key()));
-                                        if (attributes.containsKey(CoreAttributes.MIME_TYPE.key()))
{
-                                            attributes.put("content-type", attributes.get(CoreAttributes.MIME_TYPE.key()));
+            try {
+                bundle = session.write(bundle, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream rawOut) throws IOException {
+                        try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut))
{
+                            // we don't want the packager closing the stream. V1 creates
a TAR Output Stream, which then gets
+                            // closed, which in turn closes the underlying OutputStream,
and we want to protect ourselves against that.
+                            final OutputStream out = new NonCloseableOutputStream(bufferedOut);
+
+                            for (final FlowFile flowFile : contents) {
+                                bin.getSession().read(flowFile, false, new InputStreamCallback()
{
+                                    @Override
+                                    public void process(final InputStream rawIn) throws IOException
{
+                                        try (final InputStream in = new BufferedInputStream(rawIn))
{
+                                            final Map<String, String> attributes =
new HashMap<>(flowFile.getAttributes());
+
+                                            // for backward compatibility purposes, we add
the "legacy" NiFi attributes
+                                            attributes.put("nf.file.name", attributes.get(CoreAttributes.FILENAME.key()));
+                                            attributes.put("nf.file.path", attributes.get(CoreAttributes.PATH.key()));
+                                            if (attributes.containsKey(CoreAttributes.MIME_TYPE.key()))
{
+                                                attributes.put("content-type", attributes.get(CoreAttributes.MIME_TYPE.key()));
+                                            }
+                                            packager.packageFlowFile(in, out, attributes,
flowFile.getSize());
                                         }
-                                        packager.packageFlowFile(in, out, attributes, flowFile.getSize());
                                     }
-                                }
-                            });
+                                });
+                            }
                         }
                     }
-                }
-            });
+                });
+            } catch (final Exception e) {
+                session.remove(bundle);
+                throw e;
+            }
 
             bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents)
+ ".pkg");
             session.getProvenanceReporter().join(contents, bundle);
@@ -860,34 +875,39 @@ public class MergeContent extends BinFiles {
 
             FlowFile bundle = session.create(contents);
 
-            bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents)
+ ".zip");
-            bundle = session.write(bundle, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream rawOut) throws IOException {
-                    try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut);
+            try {
+                bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents)
+ ".zip");
+                bundle = session.write(bundle, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream rawOut) throws IOException {
+                        try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut);
                             final ZipOutputStream out = new ZipOutputStream(bufferedOut))
{
-                        out.setLevel(compressionLevel);
-                        for (final FlowFile flowFile : contents) {
-                            final String path = keepPath ? getPath(flowFile) : "";
-                            final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key());
-                            final ZipEntry zipEntry = new ZipEntry(entryName);
-                            zipEntry.setSize(flowFile.getSize());
-                            try {
-                                out.putNextEntry(zipEntry);
+                            out.setLevel(compressionLevel);
+                            for (final FlowFile flowFile : contents) {
+                                final String path = keepPath ? getPath(flowFile) : "";
+                                final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key());
+                                final ZipEntry zipEntry = new ZipEntry(entryName);
+                                zipEntry.setSize(flowFile.getSize());
+                                try {
+                                    out.putNextEntry(zipEntry);
 
-                                bin.getSession().exportTo(flowFile, out);
-                                out.closeEntry();
-                                unmerged.remove(flowFile);
-                            } catch (ZipException e) {
-                                getLogger().error("Encountered exception merging {}", new
Object[]{flowFile}, e);
+                                    bin.getSession().exportTo(flowFile, out);
+                                    out.closeEntry();
+                                    unmerged.remove(flowFile);
+                                } catch (ZipException e) {
+                                    getLogger().error("Encountered exception merging {}",
new Object[] {flowFile}, e);
+                                }
                             }
-                        }
 
-                        out.finish();
-                        out.flush();
+                            out.finish();
+                            out.flush();
+                        }
                     }
-                }
-            });
+                });
+            } catch (final Exception e) {
+                session.remove(bundle);
+                throw e;
+            }
 
             session.getProvenanceReporter().join(contents, bundle);
             return bundle;
@@ -921,92 +941,97 @@ public class MergeContent extends BinFiles {
 
             // we don't pass the parents to the #create method because the parents belong
to different sessions
             FlowFile bundle = session.create(contents);
-            bundle = session.write(bundle, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream rawOut) throws IOException {
-                    try (final OutputStream out = new BufferedOutputStream(rawOut)) {
-                        for (final FlowFile flowFile : contents) {
-                            bin.getSession().read(flowFile, false, new InputStreamCallback()
{
-                                @Override
-                                public void process(InputStream in) throws IOException {
-                                    boolean canMerge = true;
-                                    try (DataFileStream<GenericRecord> reader = new
DataFileStream<>(in,
+            try {
+                bundle = session.write(bundle, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream rawOut) throws IOException {
+                        try (final OutputStream out = new BufferedOutputStream(rawOut)) {
+                            for (final FlowFile flowFile : contents) {
+                                bin.getSession().read(flowFile, false, new InputStreamCallback()
{
+                                    @Override
+                                    public void process(InputStream in) throws IOException
{
+                                        boolean canMerge = true;
+                                        try (DataFileStream<GenericRecord> reader =
new DataFileStream<>(in,
                                             new GenericDatumReader<GenericRecord>()))
{
-                                        if (schema.get() == null) {
-                                            // this is the first file - set up the writer,
and store the
-                                            // Schema & metadata we'll use.
-                                            schema.set(reader.getSchema());
-                                            if (!METADATA_STRATEGY_IGNORE.getValue().equals(metadataStrategy))
{
-                                                for (String key : reader.getMetaKeys()) {
-                                                    if (!DataFileWriter.isReservedMeta(key))
{
-                                                        byte[] metadatum = reader.getMeta(key);
-                                                        metadata.put(key, metadatum);
-                                                        writer.setMeta(key, metadatum);
+                                            if (schema.get() == null) {
+                                                // this is the first file - set up the writer,
and store the
+                                                // Schema & metadata we'll use.
+                                                schema.set(reader.getSchema());
+                                                if (!METADATA_STRATEGY_IGNORE.getValue().equals(metadataStrategy))
{
+                                                    for (String key : reader.getMetaKeys())
{
+                                                        if (!DataFileWriter.isReservedMeta(key))
{
+                                                            byte[] metadatum = reader.getMeta(key);
+                                                            metadata.put(key, metadatum);
+                                                            writer.setMeta(key, metadatum);
+                                                        }
                                                     }
                                                 }
-                                            }
-                                            inputCodec.set(reader.getMetaString(DataFileConstants.CODEC));
-                                            if (inputCodec.get() == null) {
-                                                inputCodec.set(DataFileConstants.NULL_CODEC);
-                                            }
-                                            writer.setCodec(CodecFactory.fromString(inputCodec.get()));
-                                            writer.create(schema.get(), out);
-                                        } else {
-                                            // check that we're appending to the same schema
-                                            if (!schema.get().equals(reader.getSchema()))
{
-                                                getLogger().debug("Input file {} has different
schema - {}, not merging",
-                                                        new Object[]{flowFile.getId(), reader.getSchema().getName()});
-                                                canMerge = false;
-                                                unmerged.add(flowFile);
-                                            }
+                                                inputCodec.set(reader.getMetaString(DataFileConstants.CODEC));
+                                                if (inputCodec.get() == null) {
+                                                    inputCodec.set(DataFileConstants.NULL_CODEC);
+                                                }
+                                                writer.setCodec(CodecFactory.fromString(inputCodec.get()));
+                                                writer.create(schema.get(), out);
+                                            } else {
+                                                // check that we're appending to the same
schema
+                                                if (!schema.get().equals(reader.getSchema()))
{
+                                                    getLogger().debug("Input file {} has
different schema - {}, not merging",
+                                                        new Object[] {flowFile.getId(), reader.getSchema().getName()});
+                                                    canMerge = false;
+                                                    unmerged.add(flowFile);
+                                                }
 
-                                            if (METADATA_STRATEGY_DO_NOT_MERGE.getValue().equals(metadataStrategy)
+                                                if (METADATA_STRATEGY_DO_NOT_MERGE.getValue().equals(metadataStrategy)
                                                     || METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy))
{
-                                                // check that we're appending to the same
metadata
-                                                for (String key : reader.getMetaKeys()) {
-                                                    if (!DataFileWriter.isReservedMeta(key))
{
-                                                        byte[] metadatum = reader.getMeta(key);
-                                                        byte[] writersMetadatum = metadata.get(key);
-                                                        if (!Arrays.equals(metadatum, writersMetadatum))
{
-                                                            // Ignore additional metadata
if ALL_COMMON is the strategy, otherwise don't merge
-                                                            if (!METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy)
|| writersMetadatum != null) {
-                                                                getLogger().debug("Input
file {} has different non-reserved metadata, not merging",
-                                                                        new Object[]{flowFile.getId()});
-                                                                canMerge = false;
-                                                                unmerged.add(flowFile);
+                                                    // check that we're appending to the
same metadata
+                                                    for (String key : reader.getMetaKeys())
{
+                                                        if (!DataFileWriter.isReservedMeta(key))
{
+                                                            byte[] metadatum = reader.getMeta(key);
+                                                            byte[] writersMetadatum = metadata.get(key);
+                                                            if (!Arrays.equals(metadatum,
writersMetadatum)) {
+                                                                // Ignore additional metadata
if ALL_COMMON is the strategy, otherwise don't merge
+                                                                if (!METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy)
|| writersMetadatum != null) {
+                                                                    getLogger().debug("Input
file {} has different non-reserved metadata, not merging",
+                                                                        new Object[] {flowFile.getId()});
+                                                                    canMerge = false;
+                                                                    unmerged.add(flowFile);
+                                                                }
                                                             }
                                                         }
                                                     }
-                                                }
-                                            } // else the metadata in the first FlowFile
was either ignored or retained in the if-clause above
+                                                } // else the metadata in the first FlowFile
was either ignored or retained in the if-clause above
 
-                                            // check that we're appending to the same codec
-                                            String thisCodec = reader.getMetaString(DataFileConstants.CODEC);
-                                            if (thisCodec == null) {
-                                                thisCodec = DataFileConstants.NULL_CODEC;
-                                            }
-                                            if (!inputCodec.get().equals(thisCodec)) {
-                                                getLogger().debug("Input file {} has different
codec, not merging",
-                                                        new Object[]{flowFile.getId()});
-                                                canMerge = false;
-                                                unmerged.add(flowFile);
+                                                // check that we're appending to the same
codec
+                                                String thisCodec = reader.getMetaString(DataFileConstants.CODEC);
+                                                if (thisCodec == null) {
+                                                    thisCodec = DataFileConstants.NULL_CODEC;
+                                                }
+                                                if (!inputCodec.get().equals(thisCodec))
{
+                                                    getLogger().debug("Input file {} has
different codec, not merging",
+                                                        new Object[] {flowFile.getId()});
+                                                    canMerge = false;
+                                                    unmerged.add(flowFile);
+                                                }
                                             }
-                                        }
 
-                                        // write the Avro content from the current FlowFile
to the merged OutputStream
-                                        if (canMerge) {
-                                            writer.appendAllFrom(reader, false);
+                                            // write the Avro content from the current FlowFile
to the merged OutputStream
+                                            if (canMerge) {
+                                                writer.appendAllFrom(reader, false);
+                                            }
                                         }
                                     }
-                                }
-                            });
+                                });
+                            }
+                            writer.flush();
+                        } finally {
+                            writer.close();
                         }
-                        writer.flush();
-                    } finally {
-                        writer.close();
                     }
-                }
-            });
+                });
+            } catch (final Exception e) {
+                session.remove(bundle);
+                throw e;
+            }
 
             final Collection<FlowFile> parents;
             if (unmerged.isEmpty()) {


Mime
View raw message