nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject nifi git commit: NIFI-1909 Adding ability to process schemaless Avro records to ConvertAvrotoJson.
Date Mon, 20 Jun 2016 17:46:15 GMT
Repository: nifi
Updated Branches:
  refs/heads/0.x e016efa0d -> 15dad1ce0


NIFI-1909 Adding ability to process schemaless Avro records to ConvertAvrotoJson.

 - Made suggested changes and removed unused imports found by checkstyle
 - This closes #459.

Signed-off-by: Mark Payne <markap14@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/15dad1ce
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/15dad1ce
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/15dad1ce

Branch: refs/heads/0.x
Commit: 15dad1ce0e0a84f89c8eb8e945c1e17cc1e281ab
Parents: e016efa
Author: Ryan Persaud <ryan.persaud@gmail.com>
Authored: Fri May 20 15:27:39 2016 -0400
Committer: Mark Payne <markap14@hotmail.com>
Committed: Mon Jun 20 13:45:46 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/avro/ConvertAvroToJSON.java | 103 +++++++++++-----
 .../processors/avro/TestConvertAvroToJSON.java  | 118 +++++++++++++++++++
 2 files changed, 190 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/15dad1ce/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
index d9fa4ff..2ddf66e 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
@@ -28,10 +28,14 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
@@ -49,6 +53,7 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
 
 @SideEffectFree
 @SupportsBatching
@@ -81,6 +86,12 @@ public class ConvertAvroToJSON extends AbstractProcessor {
         .defaultValue("false")
         .required(true)
         .build();
+    static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
+        .name("Avro schema")
+        .description("If the Avro records do not contain the schema (datum only), it must
be specified here.")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .required(false)
+        .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
@@ -92,6 +103,7 @@ public class ConvertAvroToJSON extends AbstractProcessor {
         .build();
 
     private List<PropertyDescriptor> properties;
+    private volatile Schema schema = null;
 
     @Override
     protected void init(ProcessorInitializationContext context) {
@@ -100,6 +112,7 @@ public class ConvertAvroToJSON extends AbstractProcessor {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(CONTAINER_OPTIONS);
         properties.add(WRAP_SINGLE_RECORD);
+        properties.add(SCHEMA);
         this.properties = Collections.unmodifiableList(properties);
     }
 
@@ -128,49 +141,77 @@ public class ConvertAvroToJSON extends AbstractProcessor {
         // Wrap a single record (inclusive of no records) only when a container is being
used
         final boolean wrapSingleRecord = context.getProperty(WRAP_SINGLE_RECORD).asBoolean()
&& useContainer;
 
+        final String stringSchema = context.getProperty(SCHEMA).getValue();
+        final boolean schemaLess = stringSchema != null;
+
         try {
             flowFile = session.write(flowFile, new StreamCallback() {
                 @Override
                 public void process(final InputStream rawIn, final OutputStream rawOut) throws
IOException {
-                    try (final InputStream in = new BufferedInputStream(rawIn);
-                         final OutputStream out = new BufferedOutputStream(rawOut);
-                         final DataFileStream<GenericRecord> reader = new DataFileStream<>(in,
new GenericDatumReader<GenericRecord>())) {
+                    final GenericData genericData = GenericData.get();
 
-                        final GenericData genericData = GenericData.get();
-
-                        int recordCount = 0;
-                        GenericRecord currRecord = null;
-                        if (reader.hasNext()) {
-                            currRecord = reader.next();
-                            recordCount++;
+                    if (schemaLess) {
+                        if (schema == null) {
+                            schema = new Schema.Parser().parse(stringSchema);
                         }
+                        try (final InputStream in = new BufferedInputStream(rawIn);
+                             final OutputStream out = new BufferedOutputStream(rawOut)) {
+                            final DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
+                            final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in,
null);
+                            final GenericRecord record = reader.read(null, decoder);
+
+                            // Schemaless records are singletons, so both useContainer and
wrapSingleRecord
+                            // need to be true before we wrap it with an array
+                            if (useContainer && wrapSingleRecord) {
+                                out.write('[');
+                            }
 
-                        // Open container if desired output is an array format and there
are are multiple records or
-                        // if configured to wrap single record
-                        if (reader.hasNext() && useContainer || wrapSingleRecord)
{
-                            out.write('[');
-                        }
+                            final byte[] outputBytes = (record == null) ? EMPTY_JSON_OBJECT
: genericData.toString(record).getBytes(StandardCharsets.UTF_8);
+                            out.write(outputBytes);
 
-                        // Determine the initial output record, inclusive if we should have
an empty set of Avro records
-                        final byte[] outputBytes = (currRecord == null) ? EMPTY_JSON_OBJECT
: genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8);
-                        out.write(outputBytes);
+                            if (useContainer && wrapSingleRecord) {
+                                out.write(']');
+                            }
+                        }
+                    } else {
+                        try (final InputStream in = new BufferedInputStream(rawIn);
+                             final OutputStream out = new BufferedOutputStream(rawOut);
+                             final DataFileStream<GenericRecord> reader = new DataFileStream<>(in,
new GenericDatumReader<GenericRecord>())) {
+
+                            int recordCount = 0;
+                            GenericRecord currRecord = null;
+                            if (reader.hasNext()) {
+                                currRecord = reader.next();
+                                recordCount++;
+                            }
 
-                        while (reader.hasNext()) {
-                            if (useContainer) {
-                                out.write(',');
-                            } else {
-                                out.write('\n');
+                            // Open container if desired output is an array format and there
are are multiple records or
+                            // if configured to wrap single record
+                            if (reader.hasNext() && useContainer || wrapSingleRecord)
{
+                                out.write('[');
                             }
 
-                            currRecord = reader.next(currRecord);
-                            out.write(genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8));
-                            recordCount++;
-                        }
+                            // Determine the initial output record, inclusive if we should
have an empty set of Avro records
+                            final byte[] outputBytes = (currRecord == null) ? EMPTY_JSON_OBJECT
: genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8);
+                            out.write(outputBytes);
+
+                            while (reader.hasNext()) {
+                                if (useContainer) {
+                                    out.write(',');
+                                } else {
+                                    out.write('\n');
+                                }
 
-                        // Close container if desired output is an array format and there
are multiple records or if
-                        // configured to wrap a single record
-                        if (recordCount > 1 && useContainer || wrapSingleRecord)
{
-                            out.write(']');
+                                currRecord = reader.next(currRecord);
+                                out.write(genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8));
+                                recordCount++;
+                            }
+
+                            // Close container if desired output is an array format and there
are multiple records or if
+                            // configured to wrap a single record
+                            if (recordCount > 1 && useContainer || wrapSingleRecord)
{
+                                out.write(']');
+                            }
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/15dad1ce/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
index 856677a..0884eb3 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
@@ -24,7 +24,9 @@ import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -119,6 +121,122 @@ public class TestConvertAvroToJSON {
         out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\":
null}");
     }
 
+    @Test
+    public void testSingleSchemalessAvroMessage() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
+        Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
+        String stringSchema = schema.toString();
+        runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema);
+
+        final GenericRecord user1 = new GenericData.Record(schema);
+        user1.put("name", "Alyssa");
+        user1.put("favorite_number", 256);
+
+        final ByteArrayOutputStream out1 = new ByteArrayOutputStream();
+        final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null);
+        final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+        datumWriter.write(user1, encoder);
+
+        encoder.flush();
+        out1.flush();
+        byte[] test = out1.toByteArray();
+        runner.enqueue(test);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
+        out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\":
null}");
+    }
+
+    @Test
+    public void testSingleSchemalessAvroMessage_noContainer() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
+        runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE);
+        Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
+        String stringSchema = schema.toString();
+        runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema);
+
+        final GenericRecord user1 = new GenericData.Record(schema);
+        user1.put("name", "Alyssa");
+        user1.put("favorite_number", 256);
+
+        final ByteArrayOutputStream out1 = new ByteArrayOutputStream();
+        final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null);
+        final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+        datumWriter.write(user1, encoder);
+
+        encoder.flush();
+        out1.flush();
+        byte[] test = out1.toByteArray();
+        runner.enqueue(test);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
+        out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\":
null}");
+    }
+
+    @Test
+    public void testSingleSchemalessAvroMessage_wrapSingleMessage() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
+        runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_ARRAY);
+        runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true));
+        Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
+        String stringSchema = schema.toString();
+        runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema);
+
+        final GenericRecord user1 = new GenericData.Record(schema);
+        user1.put("name", "Alyssa");
+        user1.put("favorite_number", 256);
+
+        final ByteArrayOutputStream out1 = new ByteArrayOutputStream();
+        final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null);
+        final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+        datumWriter.write(user1, encoder);
+
+        encoder.flush();
+        out1.flush();
+        byte[] test = out1.toByteArray();
+        runner.enqueue(test);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
+        out.assertContentEquals("[{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\":
null}]");
+    }
+
+    @Test
+    public void testSingleSchemalessAvroMessage_wrapSingleMessage_noContainer() throws IOException
{
+        final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
+        runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE);
+        runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true));
+        Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
+        String stringSchema = schema.toString();
+        runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema);
+
+        final GenericRecord user1 = new GenericData.Record(schema);
+        user1.put("name", "Alyssa");
+        user1.put("favorite_number", 256);
+
+        final ByteArrayOutputStream out1 = new ByteArrayOutputStream();
+        final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null);
+        final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+        datumWriter.write(user1, encoder);
+
+        encoder.flush();
+        out1.flush();
+        byte[] test = out1.toByteArray();
+        runner.enqueue(test);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
+        out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\":
null}");
+    }
 
     @Test
     public void testMultipleAvroMessages() throws IOException {


Mime
View raw message