Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E5E71200D20 for ; Mon, 2 Oct 2017 21:41:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E46801609C0; Mon, 2 Oct 2017 19:41:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9A806160BD5 for ; Mon, 2 Oct 2017 21:41:05 +0200 (CEST) Received: (qmail 48836 invoked by uid 500); 2 Oct 2017 19:41:00 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 48818 invoked by uid 99); 2 Oct 2017 19:41:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Oct 2017 19:41:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B66E0F56AE; Mon, 2 Oct 2017 19:41:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: markap14@apache.org To: commits@nifi.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: nifi git commit: NIFI-4008: Allow 0 or more records within a message. This closes #1891. Date: Mon, 2 Oct 2017 19:41:00 +0000 (UTC) archived-at: Mon, 02 Oct 2017 19:41:07 -0000 Repository: nifi Updated Branches: refs/heads/master 36b16c9cd -> 58e4fb576 NIFI-4008: Allow 0 or more records within a message. This closes #1891. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/58e4fb57 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/58e4fb57 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/58e4fb57 Branch: refs/heads/master Commit: 58e4fb576e457b3ce515a1b22ec383b606efd297 Parents: 36b16c9 Author: Koji Kawamura Authored: Mon Jun 5 21:40:53 2017 +0900 Committer: Mark Payne Committed: Mon Oct 2 15:40:29 2017 -0400 ---------------------------------------------------------------------- .../kafka/pubsub/ConsumeKafkaRecord_0_10.java | 4 +- .../processors/kafka/pubsub/ConsumerLease.java | 151 +++++++++---------- 2 files changed, 75 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/58e4fb57/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java index c44e25e..adb7a6f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java @@ -55,11 +55,11 @@ import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordSetWriterFactory; @CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.10.x Consumer API. " - + "The complementary NiFi processor for sending messages is PublishKafka_0_10. Please note that, at this time, the Processor assumes that " + + "The complementary NiFi processor for sending messages is PublishKafkaRecord_0_10. Please note that, at this time, the Processor assumes that " + "all records that are retrieved from a given partition have the same schema. If any of the Kafka messages are pulled but cannot be parsed or written with the " + "configured Record Reader or Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the " + "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual messages within the single FlowFile. " - + "A 'record.count' attribute is added to indicate how many messages are contained in the FlowFile.") + + "A 'record.count' attribute is added to indicate how many records are contained in the FlowFile.") @Tags({"Kafka", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.10.x"}) @WritesAttributes({ @WritesAttribute(attribute = "record.count", description = "The number of records received"), http://git-wip-us.apache.org/repos/asf/nifi/blob/58e4fb57/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 64af412..8dc13f4 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import javax.xml.bind.DatatypeConverter; @@ -432,105 +433,99 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe bundleMap.put(bundleInfo, tracker); } - private void handleParseFailure(final ConsumerRecord consumerRecord, final ProcessSession session, final Exception cause) { - handleParseFailure(consumerRecord, session, cause, "Failed to parse message from Kafka using the configured Record Reader. " - + "Will route message as its own FlowFile to the 'parse.failure' relationship"); - } - private void handleParseFailure(final ConsumerRecord consumerRecord, final ProcessSession session, final Exception cause, final String message) { - // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship - final Map attributes = new HashMap<>(); - attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset())); - attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition())); - attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic()); + private void writeRecordData(final ProcessSession session, final List> messages, final TopicPartition topicPartition) { + RecordSetWriter writer = null; - FlowFile failureFlowFile = session.create(); - final byte[] value = consumerRecord.value(); - if (value != null) { - failureFlowFile = session.write(failureFlowFile, out -> out.write(value)); - } - failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); + final BiConsumer, Exception> handleParseFailure = (consumerRecord, e) -> { + // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship + // And continue to the next message. + final Map attributes = new HashMap<>(); + attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset())); + attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition())); + attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic()); - final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic()); - session.getProvenanceReporter().receive(failureFlowFile, transitUri); + FlowFile failureFlowFile = session.create(); + failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value())); + failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); - session.transfer(failureFlowFile, REL_PARSE_FAILURE); + final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic()); + session.getProvenanceReporter().receive(failureFlowFile, transitUri); - if (cause == null) { - logger.error(message); - } else { - logger.error(message, cause); - } + session.transfer(failureFlowFile, REL_PARSE_FAILURE); + logger.error("Failed to parse message from Kafka using the configured Record Reader. " + + "Will route message as its own FlowFile to the 'parse.failure' relationship", e); - session.adjustCounter("Parse Failures", 1, false); - } - - private void writeRecordData(final ProcessSession session, final List> records, final TopicPartition topicPartition) { - RecordSetWriter writer = null; + session.adjustCounter("Parse Failures", 1, false); + }; try { - for (final ConsumerRecord consumerRecord : records) { - final Record record; + for (final ConsumerRecord consumerRecord : messages) { try (final InputStream in = new ByteArrayInputStream(consumerRecord.value())) { - final RecordReader reader = readerFactory.createRecordReader(Collections.EMPTY_MAP, in, logger); - record = reader.nextRecord(); - } catch (final Exception e) { - handleParseFailure(consumerRecord, session, e); - continue; - } - if (record == null) { - handleParseFailure(consumerRecord, session, null); - continue; - } + final RecordReader reader; + final Record firstRecord; - // Determine the bundle for this record. - final RecordSchema recordSchema = record.getSchema(); - final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema); - - BundleTracker tracker = bundleMap.get(bundleInfo); - if (tracker == null) { - FlowFile flowFile = session.create(); - final OutputStream rawOut = session.write(flowFile); - - final RecordSchema writeSchema; try { - writeSchema = writerFactory.getSchema(Collections.emptyMap(), recordSchema); + reader = readerFactory.createRecordReader(Collections.emptyMap(), in, logger); + firstRecord = reader.nextRecord(); } catch (final Exception e) { - logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e); + handleParseFailure.accept(consumerRecord, e); + continue; + } + if (firstRecord == null) { + // If the message doesn't contain any record, do nothing. + continue; + } + + // Determine the bundle for this record. + final RecordSchema recordSchema = firstRecord.getSchema(); + final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema); + + BundleTracker tracker = bundleMap.get(bundleInfo); + if (tracker == null) { + FlowFile flowFile = session.create(); + final OutputStream rawOut = session.write(flowFile); + + final RecordSchema writeSchema; try { - rollback(topicPartition); - } catch (final Exception rollbackException) { - logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); + writeSchema = writerFactory.getSchema(Collections.emptyMap(), recordSchema); + } catch (final Exception e) { + logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e); + + try { + rollback(topicPartition); + } catch (final Exception rollbackException) { + logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); + } + + yield(); + throw new ProcessException(e); } - yield(); - throw new ProcessException(e); - } - - writer = writerFactory.createWriter(logger, writeSchema, rawOut); - writer.beginRecordSet(); + writer = writerFactory.createWriter(logger, writeSchema, rawOut); + writer.beginRecordSet(); - tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer); - tracker.updateFlowFile(flowFile); - bundleMap.put(bundleInfo, tracker); - } else { - writer = tracker.recordWriter; - } + tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer); + tracker.updateFlowFile(flowFile); + bundleMap.put(bundleInfo, tracker); + } else { + writer = tracker.recordWriter; + } - try { - writer.write(record); - } catch (final RuntimeException re) { - handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. " - + "Will route message as its own FlowFile to the 'parse.failure' relationship"); - continue; + try { + for (Record record = firstRecord; record != null; record = reader.nextRecord()) { + writer.write(record); + tracker.incrementRecordCount(1L); + session.adjustCounter("Records Received", 1, false); + } + } catch (Exception e) { + // Transfer it to 'parse failure' and continue to the next message. + handleParseFailure.accept(consumerRecord, e); + } } - - tracker.incrementRecordCount(1L); } - - session.adjustCounter("Records Received", records.size(), false); } catch (final Exception e) { logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e);