nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject nifi git commit: NIFI-4008: Allow 0 or more records within a message. This closes #1891.
Date Mon, 02 Oct 2017 19:41:00 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 36b16c9cd -> 58e4fb576


NIFI-4008: Allow 0 or more records within a message. This closes #1891.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/58e4fb57
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/58e4fb57
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/58e4fb57

Branch: refs/heads/master
Commit: 58e4fb576e457b3ce515a1b22ec383b606efd297
Parents: 36b16c9
Author: Koji Kawamura <ijokarumawak@apache.org>
Authored: Mon Jun 5 21:40:53 2017 +0900
Committer: Mark Payne <markap14@hotmail.com>
Committed: Mon Oct 2 15:40:29 2017 -0400

----------------------------------------------------------------------
 .../kafka/pubsub/ConsumeKafkaRecord_0_10.java   |   4 +-
 .../processors/kafka/pubsub/ConsumerLease.java  | 151 +++++++++----------
 2 files changed, 75 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/58e4fb57/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
index c44e25e..adb7a6f 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
@@ -55,11 +55,11 @@ import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 
 @CapabilityDescription("Consumes messages from Apache Kafka specifically built against the
Kafka 0.10.x Consumer API. "
-    + "The complementary NiFi processor for sending messages is PublishKafka_0_10. Please
note that, at this time, the Processor assumes that "
+    + "The complementary NiFi processor for sending messages is PublishKafkaRecord_0_10.
Please note that, at this time, the Processor assumes that "
     + "all records that are retrieved from a given partition have the same schema. If any
of the Kafka messages are pulled but cannot be parsed or written with the "
     + "configured Record Reader or Record Writer, the contents of the message will be written
to a separate FlowFile, and that FlowFile will be transferred to the "
     + "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship
and may contain many individual messages within the single FlowFile. "
-    + "A 'record.count' attribute is added to indicate how many messages are contained in
the FlowFile.")
+    + "A 'record.count' attribute is added to indicate how many records are contained in
the FlowFile.")
 @Tags({"Kafka", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub",
"Consume", "0.10.x"})
 @WritesAttributes({
     @WritesAttribute(attribute = "record.count", description = "The number of records received"),

http://git-wip-us.apache.org/repos/asf/nifi/blob/58e4fb57/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 64af412..8dc13f4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 
 import javax.xml.bind.DatatypeConverter;
 
@@ -432,105 +433,99 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         bundleMap.put(bundleInfo, tracker);
     }
 
-    private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord,
final ProcessSession session, final Exception cause) {
-        handleParseFailure(consumerRecord, session, cause, "Failed to parse message from
Kafka using the configured Record Reader. "
-            + "Will route message as its own FlowFile to the 'parse.failure' relationship");
-    }
 
-    private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord,
final ProcessSession session, final Exception cause, final String message) {
-        // If we are unable to parse the data, we need to transfer it to 'parse failure'
relationship
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
-        attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
-        attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());
+    private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[],
byte[]>> messages, final TopicPartition topicPartition) {
+        RecordSetWriter writer = null;
 
-        FlowFile failureFlowFile = session.create();
-        final byte[] value = consumerRecord.value();
-        if (value != null) {
-            failureFlowFile = session.write(failureFlowFile, out -> out.write(value));
-        }
-        failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
+        final BiConsumer<ConsumerRecord<byte[], byte[]>, Exception> handleParseFailure
= (consumerRecord, e) -> {
+            // If we are unable to parse the data, we need to transfer it to 'parse failure'
relationship
+            // And continue to the next message.
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+            attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
+            attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
 
-        final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers,
consumerRecord.topic());
-        session.getProvenanceReporter().receive(failureFlowFile, transitUri);
+            FlowFile failureFlowFile = session.create();
+            failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
+            failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
 
-        session.transfer(failureFlowFile, REL_PARSE_FAILURE);
+            final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol,
bootstrapServers, topicPartition.topic());
+            session.getProvenanceReporter().receive(failureFlowFile, transitUri);
 
-        if (cause == null) {
-            logger.error(message);
-        } else {
-            logger.error(message, cause);
-        }
+            session.transfer(failureFlowFile, REL_PARSE_FAILURE);
+            logger.error("Failed to parse message from Kafka using the configured Record
Reader. "
+                    + "Will route message as its own FlowFile to the 'parse.failure' relationship",
e);
 
-        session.adjustCounter("Parse Failures", 1, false);
-    }
-
-    private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[],
byte[]>> records, final TopicPartition topicPartition) {
-        RecordSetWriter writer = null;
+            session.adjustCounter("Parse Failures", 1, false);
+        };
 
         try {
-            for (final ConsumerRecord<byte[], byte[]> consumerRecord : records) {
-                final Record record;
+            for (final ConsumerRecord<byte[], byte[]> consumerRecord : messages) {
                 try (final InputStream in = new ByteArrayInputStream(consumerRecord.value()))
{
-                    final RecordReader reader = readerFactory.createRecordReader(Collections.EMPTY_MAP,
in, logger);
-                    record = reader.nextRecord();
-                } catch (final Exception e) {
-                    handleParseFailure(consumerRecord, session, e);
-                    continue;
-                }
 
-                if (record == null) {
-                    handleParseFailure(consumerRecord, session, null);
-                    continue;
-                }
+                    final RecordReader reader;
+                    final Record firstRecord;
 
-                // Determine the bundle for this record.
-                final RecordSchema recordSchema = record.getSchema();
-                final BundleInformation bundleInfo = new BundleInformation(topicPartition,
recordSchema);
-
-                BundleTracker tracker = bundleMap.get(bundleInfo);
-                if (tracker == null) {
-                    FlowFile flowFile = session.create();
-                    final OutputStream rawOut = session.write(flowFile);
-
-                    final RecordSchema writeSchema;
                     try {
-                        writeSchema = writerFactory.getSchema(Collections.emptyMap(), recordSchema);
+                        reader = readerFactory.createRecordReader(Collections.emptyMap(),
in, logger);
+                        firstRecord = reader.nextRecord();
                     } catch (final Exception e) {
-                        logger.error("Failed to obtain Schema for FlowFile. Will roll back
the Kafka message offsets.", e);
+                        handleParseFailure.accept(consumerRecord, e);
+                        continue;
+                    }
 
+                    if (firstRecord == null) {
+                        // If the message doesn't contain any record, do nothing.
+                        continue;
+                    }
+
+                    // Determine the bundle for this record.
+                    final RecordSchema recordSchema = firstRecord.getSchema();
+                    final BundleInformation bundleInfo = new BundleInformation(topicPartition,
recordSchema);
+
+                    BundleTracker tracker = bundleMap.get(bundleInfo);
+                    if (tracker == null) {
+                        FlowFile flowFile = session.create();
+                        final OutputStream rawOut = session.write(flowFile);
+
+                        final RecordSchema writeSchema;
                         try {
-                            rollback(topicPartition);
-                        } catch (final Exception rollbackException) {
-                            logger.warn("Attempted to rollback Kafka message offset but was
unable to do so", rollbackException);
+                            writeSchema = writerFactory.getSchema(Collections.emptyMap(),
recordSchema);
+                        } catch (final Exception e) {
+                            logger.error("Failed to obtain Schema for FlowFile. Will roll
back the Kafka message offsets.", e);
+
+                            try {
+                                rollback(topicPartition);
+                            } catch (final Exception rollbackException) {
+                                logger.warn("Attempted to rollback Kafka message offset but
was unable to do so", rollbackException);
+                            }
+
+                            yield();
+                            throw new ProcessException(e);
                         }
 
-                        yield();
-                        throw new ProcessException(e);
-                    }
-
-                    writer = writerFactory.createWriter(logger, writeSchema, rawOut);
-                    writer.beginRecordSet();
+                        writer = writerFactory.createWriter(logger, writeSchema, rawOut);
+                        writer.beginRecordSet();
 
-                    tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding,
writer);
-                    tracker.updateFlowFile(flowFile);
-                    bundleMap.put(bundleInfo, tracker);
-                } else {
-                    writer = tracker.recordWriter;
-                }
+                        tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding,
writer);
+                        tracker.updateFlowFile(flowFile);
+                        bundleMap.put(bundleInfo, tracker);
+                    } else {
+                        writer = tracker.recordWriter;
+                    }
 
-                try {
-                    writer.write(record);
-                } catch (final RuntimeException re) {
-                    handleParseFailure(consumerRecord, session, re, "Failed to write message
from Kafka using the configured Record Writer. "
-                        + "Will route message as its own FlowFile to the 'parse.failure'
relationship");
-                    continue;
+                    try {
+                        for (Record record = firstRecord; record != null; record = reader.nextRecord())
{
+                            writer.write(record);
+                            tracker.incrementRecordCount(1L);
+                            session.adjustCounter("Records Received", 1, false);
+                        }
+                    } catch (Exception e) {
+                        // Transfer it to 'parse failure' and continue to the next message.
+                        handleParseFailure.accept(consumerRecord, e);
+                    }
                 }
-
-                tracker.incrementRecordCount(1L);
             }
-
-            session.adjustCounter("Records Received", records.size(), false);
         } catch (final Exception e) {
             logger.error("Failed to properly receive messages from Kafka. Will roll back
session and any un-committed offsets from Kafka.", e);
 


Mime
View raw message