nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [nifi] YolandaMDavis commented on a change in pull request #3913: NIFI-6925: Fixed JoltTransformRecord for RecordReaders, improved MockProcessSession
Date Wed, 04 Dec 2019 21:37:19 GMT
YolandaMDavis commented on a change in pull request #3913: NIFI-6925: Fixed JoltTransformRecord
for RecordReaders, improved MockProcessSession
URL: https://github.com/apache/nifi/pull/3913#discussion_r353996386
 
 

 ##########
 File path: nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
 ##########
 @@ -296,39 +297,37 @@ public void onTrigger(final ProcessContext context, ProcessSession
session) thro
         final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
         final RecordSchema schema;
+        FlowFile transformed = null;
+
         try (final InputStream in = session.read(original);
              final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger()))
{
             schema = writerFactory.getSchema(original.getAttributes(), reader.getSchema());
 
-            FlowFile transformed = session.create(original);
             final Map<String, String> attributes = new HashMap<>();
             final WriteResult writeResult;
+            transformed = session.create(original);
+
+            // We want to transform the first record before creating the Record Writer. We
do this because the Record will likely end up with a different structure
+            // and therefore a difference Schema after being transformed. As a result, we
want to transform the Record and then provide the transformed schema to the
+            // Record Writer so that if the Record Writer chooses to inherit the Record Schema
from the Record itself, it will inherit the transformed schema, not the
+            // schema determined by the Record Reader.
+            final Record firstRecord = reader.nextRecord();
+            if (firstRecord == null) {
+                try (final OutputStream out = session.write(transformed);
+                     final RecordSetWriter writer = writerFactory.createWriter(getLogger(),
schema, out, transformed)) {
 
-            try {
-                // We want to transform the first record before creating the Record Writer.
We do this because the Record will likely end up with a different structure
-                // and therefore a difference Schema after being transformed. As a result,
we want to transform the Record and then provide the transformed schema to the
-                // Record Writer so that if the Record Writer chooses to inherit the Record
Schema from the Record itself, it will inherit the transformed schema, not the
-                // schema determined by the Record Reader.
-                final Record firstRecord = reader.nextRecord();
-                if (firstRecord == null) {
-                    try (final OutputStream out = session.write(transformed);
-                         final RecordSetWriter writer = writerFactory.createWriter(getLogger(),
schema, out, transformed)) {
-
-                        writer.beginRecordSet();
-                        writeResult = writer.finishRecordSet();
-
-                        attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
-                        attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
-                        attributes.putAll(writeResult.getAttributes());
-                    }
+                    writer.beginRecordSet();
+                    writeResult = writer.finishRecordSet();
 
-                    transformed = session.putAllAttributes(transformed, attributes);
-                    session.transfer(transformed, REL_SUCCESS);
-                    session.transfer(original, REL_ORIGINAL);
-                    logger.info("{} had no Records to transform", new Object[]{original});
-                    return;
+                    attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                    attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                    attributes.putAll(writeResult.getAttributes());
                 }
 
+                transformed = session.putAllAttributes(transformed, attributes);
+                logger.info("{} had no Records to transform", new Object[]{original});
+            } else {
+
                 final JoltTransform transform = getTransform(context, original);
                 final Record transformedFirstRecord = transform(firstRecord, transform);
 
 Review comment:
   If the transform fails here transformedFirstRecord is returned and causes a NullPointerException
on the next line for writeSchema.  Recommend adding a null check.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message