nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject nifi git commit: NIFI-4008: This closes #2189. Update ConsumeKafkaRecord 0.11 so that it can consume multiple records from a single Kafka message NIFI-4008: Ensure that we always check if a Kafka message's value is null before dereferencing it
Date Fri, 06 Oct 2017 19:03:45 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 1d5df4a5e -> 582df7f4e


NIFI-4008: This closes #2189. Update ConsumeKafkaRecord 0.11 so that it can consume multiple
records from a single Kafka message
NIFI-4008: Ensure that we always check if a Kafka message's value is null before dereferencing
it

Signed-off-by: joewitt <joewitt@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/582df7f4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/582df7f4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/582df7f4

Branch: refs/heads/master
Commit: 582df7f4e8d462e87958e4894bdeac323dc69803
Parents: 1d5df4a
Author: Mark Payne <markap14@hotmail.com>
Authored: Mon Oct 2 16:02:54 2017 -0400
Committer: joewitt <joewitt@apache.org>
Committed: Fri Oct 6 15:03:37 2017 -0400

----------------------------------------------------------------------
 .../processors/kafka/pubsub/ConsumerLease.java  |   7 +-
 .../processors/kafka/pubsub/ConsumerLease.java  | 106 +++++++++----------
 2 files changed, 58 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/582df7f4/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 8dc13f4..26562b9 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -446,7 +446,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
             attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
 
             FlowFile failureFlowFile = session.create();
-            failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
+            if (consumerRecord.value() != null) {
+                failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
+            }
             failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
 
             final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol,
bootstrapServers, topicPartition.topic());
@@ -461,7 +463,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
 
         try {
             for (final ConsumerRecord<byte[], byte[]> consumerRecord : messages) {
-                try (final InputStream in = new ByteArrayInputStream(consumerRecord.value()))
{
+                final byte[] recordBytes = consumerRecord.value() == null ? new byte[0] :
consumerRecord.value();
+                try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
 
                     final RecordReader reader;
                     final Record firstRecord;

http://git-wip-us.apache.org/repos/asf/nifi/blob/582df7f4/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 97eed17..0587788 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -512,69 +512,69 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
             for (final ConsumerRecord<byte[], byte[]> consumerRecord : records) {
                 final Map<String, String> attributes = getAttributes(consumerRecord);
 
-                final Record record;
-                try (final InputStream in = new ByteArrayInputStream(consumerRecord.value()))
{
-                    final RecordReader reader = readerFactory.createRecordReader(attributes,
in, logger);
-                    record = reader.nextRecord();
-                } catch (final Exception e) {
-                    handleParseFailure(consumerRecord, session, e);
-                    continue;
-                }
-
-                if (record == null) {
-                    handleParseFailure(consumerRecord, session, null);
-                    continue;
-                }
-
-                // Determine the bundle for this record.
-                final RecordSchema recordSchema = record.getSchema();
-                final BundleInformation bundleInfo = new BundleInformation(topicPartition,
recordSchema, attributes);
+                final byte[] recordBytes = consumerRecord.value() == null ? new byte[0] :
consumerRecord.value();
+                try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
 
-                BundleTracker tracker = bundleMap.get(bundleInfo);
-                if (tracker == null) {
-                    FlowFile flowFile = session.create();
-                    flowFile = session.putAllAttributes(flowFile, attributes);
-
-                    final OutputStream rawOut = session.write(flowFile);
-
-                    final RecordSchema writeSchema;
                     try {
-                        writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                        reader = readerFactory.createRecordReader(attributes, in, logger);
                     } catch (final Exception e) {
-                        logger.error("Failed to obtain Schema for FlowFile. Will roll back
the Kafka message offsets.", e);
+                        handleParseFailure(consumerRecord, session, e);
+                        continue;
+                    }
+
+                    Record record;
+                    while ((record = reader.nextRecord()) != null) {
+                        // Determine the bundle for this record.
+                        final RecordSchema recordSchema = record.getSchema();
+                        final BundleInformation bundleInfo = new BundleInformation(topicPartition,
recordSchema, attributes);
+
+                        BundleTracker tracker = bundleMap.get(bundleInfo);
+                        if (tracker == null) {
+                            FlowFile flowFile = session.create();
+                            flowFile = session.putAllAttributes(flowFile, attributes);
+
+                            final OutputStream rawOut = session.write(flowFile);
+
+                            final RecordSchema writeSchema;
+                            try {
+                                writeSchema = writerFactory.getSchema(flowFile.getAttributes(),
recordSchema);
+                            } catch (final Exception e) {
+                                logger.error("Failed to obtain Schema for FlowFile. Will
roll back the Kafka message offsets.", e);
+
+                                try {
+                                    rollback(topicPartition);
+                                } catch (final Exception rollbackException) {
+                                    logger.warn("Attempted to rollback Kafka message offset
but was unable to do so", rollbackException);
+                                }
+
+                                yield();
+                                throw new ProcessException(e);
+                            }
+
+                            writer = writerFactory.createWriter(logger, writeSchema, rawOut);
+                            writer.beginRecordSet();
+
+                            tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding,
writer);
+                            tracker.updateFlowFile(flowFile);
+                            bundleMap.put(bundleInfo, tracker);
+                        } else {
+                            writer = tracker.recordWriter;
+                        }
 
                         try {
-                            rollback(topicPartition);
-                        } catch (final Exception rollbackException) {
-                            logger.warn("Attempted to rollback Kafka message offset but was
unable to do so", rollbackException);
+                            writer.write(record);
+                        } catch (final RuntimeException re) {
+                            handleParseFailure(consumerRecord, session, re, "Failed to write
message from Kafka using the configured Record Writer. "
+                                + "Will route message as its own FlowFile to the 'parse.failure'
relationship");
+                            continue;
                         }
 
-                        yield();
-                        throw new ProcessException(e);
+                        tracker.incrementRecordCount(1L);
+                        session.adjustCounter("Records Received", records.size(), false);
                     }
-
-                    writer = writerFactory.createWriter(logger, writeSchema, rawOut);
-                    writer.beginRecordSet();
-
-                    tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding,
writer);
-                    tracker.updateFlowFile(flowFile);
-                    bundleMap.put(bundleInfo, tracker);
-                } else {
-                    writer = tracker.recordWriter;
                 }
-
-                try {
-                    writer.write(record);
-                } catch (final RuntimeException re) {
-                    handleParseFailure(consumerRecord, session, re, "Failed to write message
from Kafka using the configured Record Writer. "
-                        + "Will route message as its own FlowFile to the 'parse.failure'
relationship");
-                    continue;
-                }
-
-                tracker.incrementRecordCount(1L);
             }
-
-            session.adjustCounter("Records Received", records.size(), false);
         } catch (final Exception e) {
             logger.error("Failed to properly receive messages from Kafka. Will roll back
session and any un-committed offsets from Kafka.", e);
 


Mime
View raw message