nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [nifi] tpalfy commented on a change in pull request #3724: NIFI-6640 - UNION/CHOICE types not handled correctly
Date Mon, 16 Sep 2019 09:33:01 GMT
tpalfy commented on a change in pull request #3724: NIFI-6640 - UNION/CHOICE types not handled
correctly
URL: https://github.com/apache/nifi/pull/3724#discussion_r324581268
 
 

 ##########
 File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractTestConversion.java
 ##########
 @@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.avro.AvroReader;
+import org.apache.nifi.avro.AvroReaderWithEmbeddedSchema;
+import org.apache.nifi.avro.AvroRecordSetWriter;
+import org.apache.nifi.csv.CSVReader;
+import org.apache.nifi.csv.CSVRecordSetWriter;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.xml.XMLReader;
+import org.apache.nifi.xml.XMLRecordSetWriter;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+
+public abstract class AbstractTestConversion {
+    protected RecordReaderFactory reader;
+    protected Consumer<TestRunner> inputHandler;
+    protected Consumer<TestRunner> readerConfigurer;
+
+    protected RecordSetWriterFactory writer;
+    protected Consumer<MockFlowFile> resultHandler;
+    protected Consumer<TestRunner> writerConfigurer;
+
+    @Before
+    public void setUp() throws Exception {
+        reader = null;
+        inputHandler = null;
+        readerConfigurer = null;
+
+        writer = null;
+        resultHandler = null;
+        writerConfigurer = null;
+    }
+
+    @Test
+    public void testCsvToJson() throws Exception {
+        fromCsv(csvPostfix());
+        toJson(jsonPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler,
resultHandler);
+    }
+
+    @Test
+    public void testCsvToAvro() throws Exception {
+        fromCsv(csvPostfix());
+        toAvro(avroPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler,
resultHandler);
+    }
+
+    @Test
+    public void testCsvToAvroToCsv() throws Exception {
+        fromCsv(csvPostfix());
+
+        AvroRecordSetWriter writer2 = new AvroRecordSetWriter();
+        AvroReader reader2 = new AvroReader();
+
+        toCsv(csvPostfix());
+
+        testChain(writer2, reader2);
+    }
+
+    @Test
+    public void testCsvToXml() throws Exception {
+        fromCsv(csvPostfix());
+        toXml(xmlPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler,
resultHandler);
+    }
+
+    @Test
+    public void testJsonToCsv() throws Exception {
+        fromJson(jsonPostfix());
+        toCsv(csvPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler,
resultHandler);
+    }
+
+    @Test
+    public void testJsonToAvro() throws Exception {
+        fromJson(jsonPostfix());
+        toAvro(avroPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler,
resultHandler);
+    }
+
+    @Test
+    public void testJsonToAvroToJson() throws Exception {
+        fromJson(jsonPostfix());
+
+        AvroRecordSetWriter writer2 = new AvroRecordSetWriter();
+        AvroReader reader2 = new AvroReader();
+
+        toJson(jsonPostfix());
+
+        testChain(writer2, reader2);
+    }
+
+    @Test
+    public void testAvroToCsv() throws Exception {
+        fromAvro(avroPostfix());
+        toCsv(csvPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler,
resultHandler);
+    }
+
+    @Test
+    public void testAvroToJson() throws Exception {
+        fromAvro(avroPostfix());
+        toJson(jsonPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler,
resultHandler);
+    }
+
+    @Test
+    public void testAvroToXml() throws Exception {
+        fromAvro(avroPostfix());
+        toXml(xmlPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler,
resultHandler);
+    }
+
+    @Test
+    public void testXmlToCsv() throws Exception {
+        fromXml(xmlPostfix());
+        toCsv(csvPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler,
resultHandler);
+    }
+
+    @Test
+    public void testXmlToJson() throws Exception {
+        fromXml(xmlPostfix());
+        toJson(jsonPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler,
resultHandler);
+    }
+
+    @Test
+    public void testXmlToAvro() throws Exception {
+        fromXml(xmlPostfix());
+        toAvro(avroPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler,
resultHandler);
+    }
+
+    @Test
+    public void testXmlToAvroToXml() throws Exception {
+        fromXml(xmlPostfix());
+
+        AvroRecordSetWriter writer2 = new AvroRecordSetWriter();
+        AvroReader reader2 = new AvroReader();
+
+        toXml(xmlPostfix());
+
+        testChain(writer2, reader2);
+    }
+
+    abstract protected String csvPostfix();
+
+    abstract protected String jsonPostfix();
+
+    abstract protected String avroPostfix();
+
+    abstract protected String xmlPostfix();
+
+    protected void commonReaderConfiguration(TestRunner testRunner) {
+    }
+
+    protected void commonWriterConfiguration(TestRunner testRunner) {
+    }
+
+    protected void fromCsv(String postfix) {
+        reader = new CSVReader();
+        inputHandler = stringInputHandler(getContent(postfix));
+
+        readerConfigurer = testRunner -> {
+            commonReaderConfiguration(testRunner);
+        };
+    }
+
+    protected void fromJson(String postfix) {
+        reader = new JsonTreeReader();
+        inputHandler = stringInputHandler(getContent(postfix));
+
+        readerConfigurer = testRunner -> {
+            commonReaderConfiguration(testRunner);
+        };
+    }
+
+    protected void fromXml(String postfix) {
+        reader = new XMLReader();
+        inputHandler = stringInputHandler(getContent(postfix));
+
+        readerConfigurer = testRunner -> {
+            commonReaderConfiguration(testRunner);
+            testRunner.setProperty(reader, XMLReader.RECORD_FORMAT, XMLReader.RECORD_ARRAY);
+        };
+    }
+
+    protected void fromAvro(String postfix) {
+        reader = new AvroReader();
+        inputHandler = byteInputHandler(getByteContent(postfix));
+
+        readerConfigurer = testRunner -> {
+            commonReaderConfiguration(testRunner);
+        };
+    }
+
+    protected void toCsv(String postfix) {
+        writer = new CSVRecordSetWriter();
+        resultHandler = stringOutputHandler(getContent(postfix));
+
+        writerConfigurer = testRunner -> {
+            commonWriterConfiguration(testRunner);
+        };
+    }
+
+    protected void toJson(String postfix) {
+        writer = new JsonRecordSetWriter();
+        resultHandler = stringOutputHandler(getContent(postfix));
+
+        writerConfigurer = testRunner -> {
+            commonWriterConfiguration(testRunner);
+            testRunner.setProperty(writer, "Pretty Print JSON", "true");
+        };
+    }
+
+    protected void toXml(String postfix) {
+        writer = new XMLRecordSetWriter();
+        resultHandler = stringOutputHandler(getContent(postfix));
+
+        writerConfigurer = testRunner -> {
+            commonWriterConfiguration(testRunner);
+            testRunner.setProperty(writer, "pretty_print_xml", "true");
+            testRunner.setProperty(writer, "root_tag_name", "root");
+            testRunner.setProperty(writer, "record_tag_name", "nifiRecord");
+        };
+    }
+
+    protected void toAvro(String postfix) {
+        writer = new AvroRecordSetWriter();
+        resultHandler = mockFlowFile -> {
+            try {
+                ArrayList<Map<String, Object>> expected = getRecords(getByteContent(postfix));
+                ArrayList<Map<String, Object>> actual = getRecords(mockFlowFile.toByteArray());
+
+                assertEquals(expected, actual);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        };
+
+        writerConfigurer = testRunner -> {
+            commonWriterConfiguration(testRunner);
+        };
+    }
+
+    protected Consumer<TestRunner> stringInputHandler(String input) {
+        return testRunner -> testRunner.enqueue(input);
+    }
+
+    protected Consumer<TestRunner> byteInputHandler(byte[] input) {
+        return testRunner -> testRunner.enqueue(input);
+    }
+
+    protected Consumer<MockFlowFile> stringOutputHandler(String expected) {
+        return mockFlowFile -> mockFlowFile.assertContentEquals(expected);
+    }
+
+    protected String getContent(String postfix) {
+        String content = new String(getByteContent(postfix));
+
+        return content;
+    }
+
+    protected byte[] getByteContent(String postfix) {
+        byte[] content;
+        try {
+            content = Files.readAllBytes(Paths.get("src/test/resources/TestConversions/data.int_float_string."
+ postfix));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        return content;
+    }
+
+    protected ArrayList<Map<String, Object>> getRecords(byte[] avroData) throws
IOException, MalformedRecordException {
+        RecordReader reader = new AvroReaderWithEmbeddedSchema(new ByteArrayInputStream(avroData));
+
+        return getRecords(reader);
+    }
+
+    protected ArrayList<Map<String, Object>> getRecords(RecordReader reader)
throws IOException, MalformedRecordException {
+        try {
+            ArrayList<Map<String, Object>> records = new ArrayList<>();
+
+            Record record;
+            while ((record = reader.nextRecord()) != null) {
+                records.add(record.toMap());
+            }
+
+            return records;
+        } finally {
+            reader.close();
 
 Review comment:
   Good point!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message