nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [1/2] nifi git commit: NIFI-3739: This closes #1728. Pass the proper InputStream to RecordSetWriterFactory in order to obtain RecordSetWriter; also fix error handling so that we don't kill kafka client if unable to create writer, since we roll back the o
Date Tue, 02 May 2017 05:03:41 GMT
Repository: nifi
Updated Branches:
  refs/heads/master b96e402e7 -> 016ae3191


NIFI-3739: This closes #1728. Pass the proper InputStream to RecordSetWriterFactory in order
to obtain RecordSetWriter; also fix error handling so that we don't kill kafka client if unable
to create writer, since we roll back the offsets

Signed-off-by: joewitt <joewitt@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/40de1b18
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/40de1b18
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/40de1b18

Branch: refs/heads/master
Commit: 40de1b18d9e6c55cdf39842c5200b5d43f10dbde
Parents: b96e402
Author: Mark Payne <markap14@hotmail.com>
Authored: Tue May 2 00:18:47 2017 -0400
Committer: joewitt <joewitt@apache.org>
Committed: Tue May 2 00:21:15 2017 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/kafka/pubsub/ConsumerLease.java   | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/40de1b18/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index b2665ae..ac24e1f 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -171,6 +171,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
             final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
             lastPollEmpty = records.count() == 0;
             processRecords(records);
+        } catch (final ProcessException pe) {
+            throw pe;
         } catch (final Throwable t) {
             this.poison();
             throw t;
@@ -405,11 +407,15 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     }
 
     private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[],
byte[]>> records, final TopicPartition topicPartition) {
+        if (records.isEmpty()) {
+            return;
+        }
+
         FlowFile flowFile = session.create();
         try {
             final RecordSetWriter writer;
             try {
-                writer = writerFactory.createWriter(logger, flowFile, new ByteArrayInputStream(new
byte[0]));
+                writer = writerFactory.createWriter(logger, flowFile, new ByteArrayInputStream(records.get(0).value()));
             } catch (final Exception e) {
                 logger.error(
                     "Failed to obtain a Record Writer for serializing Kafka messages. This
generally happens because the "


Mime
View raw message