Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 05D49200B16 for ; Mon, 20 Jun 2016 19:46:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 04A9C160A55; Mon, 20 Jun 2016 17:46:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D35F6160A26 for ; Mon, 20 Jun 2016 19:46:16 +0200 (CEST) Received: (qmail 64478 invoked by uid 500); 20 Jun 2016 17:46:16 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 64469 invoked by uid 99); 20 Jun 2016 17:46:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Jun 2016 17:46:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CC896DFC6F; Mon, 20 Jun 2016 17:46:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: markap14@apache.org To: commits@nifi.apache.org Message-Id: <8ee6225484dc4a17a1be546c0c42fb62@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: nifi git commit: NIFI-1909 Adding ability to process schemaless Avro records to ConvertAvrotoJson. Date: Mon, 20 Jun 2016 17:46:15 +0000 (UTC) archived-at: Mon, 20 Jun 2016 17:46:18 -0000 Repository: nifi Updated Branches: refs/heads/0.x e016efa0d -> 15dad1ce0 NIFI-1909 Adding ability to process schemaless Avro records to ConvertAvrotoJson. - Made suggested changes and removed unused imports found by checkstyle - This closes #459. Signed-off-by: Mark Payne Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/15dad1ce Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/15dad1ce Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/15dad1ce Branch: refs/heads/0.x Commit: 15dad1ce0e0a84f89c8eb8e945c1e17cc1e281ab Parents: e016efa Author: Ryan Persaud Authored: Fri May 20 15:27:39 2016 -0400 Committer: Mark Payne Committed: Mon Jun 20 13:45:46 2016 -0400 ---------------------------------------------------------------------- .../nifi/processors/avro/ConvertAvroToJSON.java | 103 +++++++++++----- .../processors/avro/TestConvertAvroToJSON.java | 118 +++++++++++++++++++ 2 files changed, 190 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/15dad1ce/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java index d9fa4ff..2ddf66e 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java @@ -28,10 +28,14 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; @@ -49,6 +53,7 @@ import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; @SideEffectFree @SupportsBatching @@ -81,6 +86,12 @@ public class ConvertAvroToJSON extends AbstractProcessor { .defaultValue("false") .required(true) .build(); + static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder() + .name("Avro schema") + .description("If the Avro records do not contain the schema (datum only), it must be specified here.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -92,6 +103,7 @@ public class ConvertAvroToJSON extends AbstractProcessor { .build(); private List properties; + private volatile Schema schema = null; @Override protected void init(ProcessorInitializationContext context) { @@ -100,6 +112,7 @@ public class ConvertAvroToJSON extends AbstractProcessor { final List properties = new ArrayList<>(); properties.add(CONTAINER_OPTIONS); properties.add(WRAP_SINGLE_RECORD); + properties.add(SCHEMA); this.properties = Collections.unmodifiableList(properties); } @@ -128,49 +141,77 @@ public class ConvertAvroToJSON extends AbstractProcessor { // Wrap a single record (inclusive of no records) only when a container is being used final boolean wrapSingleRecord = context.getProperty(WRAP_SINGLE_RECORD).asBoolean() && useContainer; + final String stringSchema = context.getProperty(SCHEMA).getValue(); + final boolean schemaLess = stringSchema != null; + try { flowFile = session.write(flowFile, new StreamCallback() { @Override public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException { - try (final InputStream in = new BufferedInputStream(rawIn); - final OutputStream out = new BufferedOutputStream(rawOut); - final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { + final GenericData genericData = GenericData.get(); - final GenericData genericData = GenericData.get(); - - int recordCount = 0; - GenericRecord currRecord = null; - if (reader.hasNext()) { - currRecord = reader.next(); - recordCount++; + if (schemaLess) { + if (schema == null) { + schema = new Schema.Parser().parse(stringSchema); } + try (final InputStream in = new BufferedInputStream(rawIn); + final OutputStream out = new BufferedOutputStream(rawOut)) { + final DatumReader reader = new GenericDatumReader(schema); + final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null); + final GenericRecord record = reader.read(null, decoder); + + // Schemaless records are singletons, so both useContainer and wrapSingleRecord + // need to be true before we wrap it with an array + if (useContainer && wrapSingleRecord) { + out.write('['); + } - // Open container if desired output is an array format and there are are multiple records or - // if configured to wrap single record - if (reader.hasNext() && useContainer || wrapSingleRecord) { - out.write('['); - } + final byte[] outputBytes = (record == null) ? EMPTY_JSON_OBJECT : genericData.toString(record).getBytes(StandardCharsets.UTF_8); + out.write(outputBytes); - // Determine the initial output record, inclusive if we should have an empty set of Avro records - final byte[] outputBytes = (currRecord == null) ? EMPTY_JSON_OBJECT : genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8); - out.write(outputBytes); + if (useContainer && wrapSingleRecord) { + out.write(']'); + } + } + } else { + try (final InputStream in = new BufferedInputStream(rawIn); + final OutputStream out = new BufferedOutputStream(rawOut); + final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { + + int recordCount = 0; + GenericRecord currRecord = null; + if (reader.hasNext()) { + currRecord = reader.next(); + recordCount++; + } - while (reader.hasNext()) { - if (useContainer) { - out.write(','); - } else { - out.write('\n'); + // Open container if desired output is an array format and there are are multiple records or + // if configured to wrap single record + if (reader.hasNext() && useContainer || wrapSingleRecord) { + out.write('['); } - currRecord = reader.next(currRecord); - out.write(genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8)); - recordCount++; - } + // Determine the initial output record, inclusive if we should have an empty set of Avro records + final byte[] outputBytes = (currRecord == null) ? EMPTY_JSON_OBJECT : genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8); + out.write(outputBytes); + + while (reader.hasNext()) { + if (useContainer) { + out.write(','); + } else { + out.write('\n'); + } - // Close container if desired output is an array format and there are multiple records or if - // configured to wrap a single record - if (recordCount > 1 && useContainer || wrapSingleRecord) { - out.write(']'); + currRecord = reader.next(currRecord); + out.write(genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8)); + recordCount++; + } + + // Close container if desired output is an array format and there are multiple records or if + // configured to wrap a single record + if (recordCount > 1 && useContainer || wrapSingleRecord) { + out.write(']'); + } } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/15dad1ce/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java index 856677a..0884eb3 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java @@ -24,7 +24,9 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -119,6 +121,122 @@ public class TestConvertAvroToJSON { out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); } + @Test + public void testSingleSchemalessAvroMessage() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + String stringSchema = schema.toString(); + runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final ByteArrayOutputStream out1 = new ByteArrayOutputStream(); + final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null); + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + datumWriter.write(user1, encoder); + + encoder.flush(); + out1.flush(); + byte[] test = out1.toByteArray(); + runner.enqueue(test); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); + } + + @Test + public void testSingleSchemalessAvroMessage_noContainer() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE); + Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + String stringSchema = schema.toString(); + runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final ByteArrayOutputStream out1 = new ByteArrayOutputStream(); + final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null); + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + datumWriter.write(user1, encoder); + + encoder.flush(); + out1.flush(); + byte[] test = out1.toByteArray(); + runner.enqueue(test); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); + } + + @Test + public void testSingleSchemalessAvroMessage_wrapSingleMessage() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_ARRAY); + runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true)); + Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + String stringSchema = schema.toString(); + runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final ByteArrayOutputStream out1 = new ByteArrayOutputStream(); + final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null); + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + datumWriter.write(user1, encoder); + + encoder.flush(); + out1.flush(); + byte[] test = out1.toByteArray(); + runner.enqueue(test); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("[{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}]"); + } + + @Test + public void testSingleSchemalessAvroMessage_wrapSingleMessage_noContainer() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE); + runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true)); + Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + String stringSchema = schema.toString(); + runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final ByteArrayOutputStream out1 = new ByteArrayOutputStream(); + final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null); + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + datumWriter.write(user1, encoder); + + encoder.flush(); + out1.flush(); + byte[] test = out1.toByteArray(); + runner.enqueue(test); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); + } @Test public void testMultipleAvroMessages() throws IOException {