nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [2/2] nifi git commit: NIFI-4639: Updated PublishKafka 1.0 processor to use a fresh writer for each output record as well. This closes #2292.
Date Fri, 08 Dec 2017 14:14:30 GMT
NIFI-4639: Updated PublishKafka 1.0 processor to use a fresh writer for each output record
as well. This closes #2292.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/113ad5ec
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/113ad5ec
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/113ad5ec

Branch: refs/heads/master
Commit: 113ad5ecfa8990c97a56b8fc31656f3542735906
Parents: c9cc76b
Author: Mark Payne <markap14@hotmail.com>
Authored: Fri Dec 8 09:13:52 2017 -0500
Committer: Mark Payne <markap14@hotmail.com>
Committed: Fri Dec 8 09:14:17 2017 -0500

----------------------------------------------------------------------
 .../processors/kafka/pubsub/PublisherLease.java |  8 ++--
 .../kafka/pubsub/TestPublisherLease.java        | 44 ++++++++++++++++++++
 2 files changed, 49 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/113ad5ec/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index abcd15f..d18df7f 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -148,13 +148,15 @@ public class PublisherLease implements Closeable {
         Record record;
         int recordCount = 0;
 
-        try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos))
{
+        try {
             while ((record = recordSet.next()) != null) {
                 recordCount++;
                 baos.reset();
 
-                writer.write(record);
-                writer.flush();
+                try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema,
baos)) {
+                    writer.write(record);
+                    writer.flush();
+                }
 
                 final byte[] messageContent = baos.toByteArray();
                 final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);

http://git-wip-us.apache.org/repos/asf/nifi/blob/113ad5ec/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index 54c1222..64451d5 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
@@ -28,6 +29,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kafka.clients.producer.Callback;
@@ -35,6 +37,16 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.util.MockFlowFile;
 import org.junit.Assert;
 import org.junit.Before;
@@ -187,4 +199,36 @@ public class TestPublisherLease {
 
         verify(producer, times(1)).flush();
     }
+
+
+    @Test
+    public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException,
MalformedRecordException {
+        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger,
true, null, StandardCharsets.UTF_8);
+
+        final FlowFile flowFile = new MockFlowFile(1L);
+        final byte[] exampleInput = "101, John Doe, 48\n102, Jane Doe, 47".getBytes(StandardCharsets.UTF_8);
+
+        final MockRecordParser readerService = new MockRecordParser();
+        readerService.addSchemaField("person_id", RecordFieldType.LONG);
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+
+        final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(),
new ByteArrayInputStream(exampleInput), logger);
+        final RecordSet recordSet = reader.createRecordSet();
+        final RecordSchema schema = reader.getSchema();
+
+        final String topic = "unit-test";
+        final String keyField = "person_id";
+
+        final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class);
+        final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
+
+        Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer);
+
+        lease.publish(flowFile, recordSet, writerFactory, schema, keyField, topic);
+
+        verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any());
+        verify(writer, times(2)).write(any(Record.class));
+        verify(producer, times(2)).send(any(), any());
+    }
 }


Mime
View raw message