Repository: nifi
Updated Branches:
refs/heads/master b96e402e7 -> 016ae3191
NIFI-3739: This closes #1728. Pass the proper InputStream to RecordSetWriterFactory in order
to obtain RecordSetWriter; also fix error handling so that we don't kill kafka client if unable
to create writer, since we roll back the offsets
Signed-off-by: joewitt <joewitt@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/40de1b18
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/40de1b18
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/40de1b18
Branch: refs/heads/master
Commit: 40de1b18d9e6c55cdf39842c5200b5d43f10dbde
Parents: b96e402
Author: Mark Payne <markap14@hotmail.com>
Authored: Tue May 2 00:18:47 2017 -0400
Committer: joewitt <joewitt@apache.org>
Committed: Tue May 2 00:21:15 2017 -0400
----------------------------------------------------------------------
.../apache/nifi/processors/kafka/pubsub/ConsumerLease.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/40de1b18/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index b2665ae..ac24e1f 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -171,6 +171,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
lastPollEmpty = records.count() == 0;
processRecords(records);
+ } catch (final ProcessException pe) {
+ throw pe;
} catch (final Throwable t) {
this.poison();
throw t;
@@ -405,11 +407,15 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[],
byte[]>> records, final TopicPartition topicPartition) {
+ if (records.isEmpty()) {
+ return;
+ }
+
FlowFile flowFile = session.create();
try {
final RecordSetWriter writer;
try {
- writer = writerFactory.createWriter(logger, flowFile, new ByteArrayInputStream(new
byte[0]));
+ writer = writerFactory.createWriter(logger, flowFile, new ByteArrayInputStream(records.get(0).value()));
} catch (final Exception e) {
logger.error(
"Failed to obtain a Record Writer for serializing Kafka messages. This
generally happens because the "
|