nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [1/2] nifi git commit: NIFI-4004: Use RecordReaderFactory without FlowFile.
Date Fri, 08 Sep 2017 16:38:19 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 20d23e836 -> 1f67cbf62


http://git-wip-us.apache.org/repos/asf/nifi/blob/1f67cbf6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
index 88b657c..eed37f8 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
@@ -31,7 +31,6 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaAccessStrategy;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
@@ -66,11 +65,11 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac
     }
 
     @Override
-    protected SchemaAccessStrategy getSchemaAccessStrategy(String allowableValue, SchemaRegistry
schemaRegistry, ConfigurationContext context) {
-        if (EMBEDDED_AVRO_SCHEMA.getValue().equals(allowableValue)) {
+    protected SchemaAccessStrategy getSchemaAccessStrategy(String strategy, SchemaRegistry
schemaRegistry, ConfigurationContext context) {
+        if (EMBEDDED_AVRO_SCHEMA.getValue().equals(strategy)) {
             return new EmbeddedAvroSchemaAccessStrategy();
         } else {
-            return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
+            return super.getSchemaAccessStrategy(strategy, schemaRegistry, context);
         }
     }
 
@@ -84,12 +83,12 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac
     }
 
     @Override
-    public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in,
final ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException
{
+    public RecordReader createRecordReader(final Map<String, String> variables, final
InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException
{
         final String schemaAccessStrategy = getConfigurationContext().getProperty(getSchemaAcessStrategyDescriptor()).getValue();
         if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) {
             return new AvroReaderWithEmbeddedSchema(in);
         } else {
-            final RecordSchema recordSchema = getSchema(flowFile, in, null);
+            final RecordSchema recordSchema = getSchema(variables, in, null);
 
             final Schema avroSchema;
             try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f67cbf6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
index a8459a9..7e49841 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
@@ -36,7 +36,6 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.schema.access.SchemaField;
@@ -80,7 +79,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
         "The FlowFile will have the Avro schema embedded into the content, as is typical
with Avro");
 
     @Override
-    public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema,
final FlowFile flowFile, final OutputStream out) throws IOException {
+    public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema,
final OutputStream out) throws IOException {
         final String strategyValue = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue();
         final String compressionFormat = getConfigurationContext().getProperty(COMPRESSION_FORMAT).getValue();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f67cbf6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
index d32e3e5..b289cbe 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
@@ -20,13 +20,13 @@ package org.apache.nifi.avro;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.EnumSet;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.schema.access.SchemaAccessStrategy;
 import org.apache.nifi.schema.access.SchemaField;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
@@ -36,7 +36,7 @@ public class EmbeddedAvroSchemaAccessStrategy implements SchemaAccessStrategy
{
     private final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT,
SchemaField.SCHEMA_TEXT_FORMAT);
 
     @Override
-    public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream,
final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
+    public RecordSchema getSchema(Map<String, String> variables, final InputStream
contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException
{
         final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(contentStream,
new GenericDatumReader<GenericRecord>());
         final Schema avroSchema = dataFileStream.getSchema();
         final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f67cbf6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
index fa60b2a..642f360 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
@@ -23,6 +23,7 @@ import java.io.Reader;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.csv.CSVFormat;
@@ -30,7 +31,6 @@ import org.apache.commons.csv.CSVParser;
 import org.apache.commons.io.input.BOMInputStream;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.schema.access.SchemaAccessStrategy;
 import org.apache.nifi.schema.access.SchemaField;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
@@ -53,7 +53,7 @@ public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy {
     }
 
     @Override
-    public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream,
final RecordSchema readSchema) throws SchemaNotFoundException {
+    public RecordSchema getSchema(Map<String, String> variables, final InputStream
contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
         if (this.context == null) {
             throw new SchemaNotFoundException("Schema Access Strategy intended only for validation
purposes and cannot obtain schema");
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f67cbf6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
index 135dd80..f15f85d 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.csv.CSVFormat;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -31,7 +32,6 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaAccessStrategy;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
@@ -99,23 +99,23 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
     }
 
     @Override
-    public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in,
final ComponentLog logger) throws IOException, SchemaNotFoundException {
+    public RecordReader createRecordReader(final Map<String, String> variables, final
InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
         // Use Mark/Reset of a BufferedInputStream in case we read from the Input Stream
for the header.
         final BufferedInputStream bufferedIn = new BufferedInputStream(in);
         bufferedIn.mark(1024 * 1024);
-        final RecordSchema schema = getSchema(flowFile, new NonCloseableInputStream(bufferedIn),
null);
+        final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(bufferedIn),
null);
         bufferedIn.reset();
 
         return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader,
ignoreHeader, dateFormat, timeFormat, timestampFormat);
     }
 
     @Override
-    protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final
SchemaRegistry schemaRegistry, final ConfigurationContext context) {
-        if (allowableValue.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) {
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry
schemaRegistry, final ConfigurationContext context) {
+        if (strategy.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) {
             return new CSVHeaderSchemaStrategy(context);
         }
 
-        return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
+        return super.getSchemaAccessStrategy(strategy, schemaRegistry, context);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f67cbf6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
index c5e6b19..bd2e600 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
@@ -28,7 +28,6 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
@@ -69,7 +68,7 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements
R
     }
 
     @Override
-    public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema,
final FlowFile flowFile, final OutputStream out) throws SchemaNotFoundException, IOException
{
+    public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema,
final OutputStream out) throws SchemaNotFoundException, IOException {
         return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), out,
             getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null),
includeHeader);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f67cbf6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index 54a2333..30c7dd3 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -35,7 +35,6 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.schema.access.SchemaAccessStrategy;
@@ -203,20 +202,20 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
     }
 
     @Override
-    protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final
SchemaRegistry schemaRegistry, final ConfigurationContext context) {
-        if (allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue()))
{
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry
schemaRegistry, final ConfigurationContext context) {
+        if (strategy.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) {
             return createAccessStrategy();
         } else {
-            return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
+            return super.getSchemaAccessStrategy(strategy, schemaRegistry, context);
         }
     }
 
     @Override
-    protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final
SchemaRegistry schemaRegistry, final ValidationContext context) {
-        if (allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue()))
{
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry
schemaRegistry, final ValidationContext context) {
+        if (strategy.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) {
             return createAccessStrategy();
         } else {
-            return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
+            return super.getSchemaAccessStrategy(strategy, schemaRegistry, context);
         }
     }
 
@@ -224,8 +223,9 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
         return new SchemaAccessStrategy() {
             private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class);
 
+
             @Override
-            public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream,
final RecordSchema readSchema) throws SchemaNotFoundException {
+            public RecordSchema getSchema(Map<String, String> variables, InputStream
contentStream, RecordSchema readSchema) throws SchemaNotFoundException {
                 return recordSchema;
             }
 
@@ -237,8 +237,8 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
     }
 
     @Override
-    public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in,
final ComponentLog logger) throws IOException, SchemaNotFoundException {
-        final RecordSchema schema = getSchema(flowFile, in, null);
+    public RecordReader createRecordReader(final Map<String, String> variables, final
InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
+        final RecordSchema schema = getSchema(variables, in, null);
         return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok, appendUnmatchedLine);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f67cbf6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
index 45cbbd1..cab4449 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -34,7 +35,6 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.DateTimeUtils;
@@ -128,8 +128,8 @@ public class JsonPathReader extends SchemaRegistryService implements RecordReade
     }
 
     @Override
-    public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in,
final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException
{
-        final RecordSchema schema = getSchema(flowFile, in, null);
+    public RecordReader createRecordReader(final Map<String, String> variables, final
InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException
{
+        final RecordSchema schema = getSchema(variables, in, null);
         return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, dateFormat, timeFormat,
timestampFormat);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f67cbf6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
index 9a722c1..cbe2f59 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
@@ -27,7 +27,6 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
@@ -64,7 +63,7 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
     }
 
     @Override
-    public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema,
final FlowFile flowFile, final OutputStream out) throws SchemaNotFoundException, IOException
{
+    public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema,
final OutputStream out) throws SchemaNotFoundException, IOException {
         return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), out, prettyPrint,
             getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null));
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f67cbf6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
index 063c9df..8290284 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -28,7 +29,6 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.DateTimeUtils;
@@ -69,7 +69,7 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
     }
 
     @Override
-    public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in,
final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException
{
-        return new JsonTreeRowRecordReader(in, logger, getSchema(flowFile, in, null), dateFormat,
timeFormat, timestampFormat);
+    public RecordReader createRecordReader(final Map<String, String> variables, final
InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException
{
+        return new JsonTreeRowRecordReader(in, logger, getSchema(variables, in, null), dateFormat,
timeFormat, timestampFormat);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f67cbf6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
index fb28b17..4853e4b 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
@@ -147,22 +147,22 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
         return schemaAccessStrategyList;
     }
 
-    protected SchemaAccessWriter getSchemaWriteStrategy(final String allowableValue) {
-        if (allowableValue == null) {
+    protected SchemaAccessWriter getSchemaWriteStrategy(final String strategy) {
+        if (strategy == null) {
             return null;
         }
 
-        if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) {
+        if (strategy.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) {
             return new SchemaNameAsAttribute();
-        } else if (allowableValue.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) {
+        } else if (strategy.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) {
             return new WriteAvroSchemaAttributeStrategy();
-        } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue()))
{
+        } else if (strategy.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
             return new HortonworksEncodedSchemaReferenceWriter();
-        } else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue()))
{
+        } else if (strategy.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
             return new HortonworksAttributeSchemaReferenceWriter();
-        } else if (allowableValue.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue()))
{
+        } else if (strategy.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) {
             return new ConfluentSchemaRegistryWriter();
-        } else if (allowableValue.equalsIgnoreCase(NO_SCHEMA.getValue())) {
+        } else if (strategy.equalsIgnoreCase(NO_SCHEMA.getValue())) {
             return new NopSchemaAccessWriter();
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f67cbf6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
index 53b030a..502d548 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
@@ -24,7 +24,6 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.schema.access.SchemaAccessStrategy;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.schema.access.SchemaField;
@@ -41,6 +40,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
@@ -57,7 +57,7 @@ public abstract class SchemaRegistryService extends AbstractControllerService
{
 
     private volatile ConfigurationContext configurationContext;
     private volatile SchemaAccessStrategy schemaAccessStrategy;
-    private static final InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);
+    private static InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);
 
     private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(
         SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA,
CONFLUENT_ENCODED_SCHEMA));
@@ -108,22 +108,17 @@ public abstract class SchemaRegistryService extends AbstractControllerService
{
         return schemaAccessStrategy;
     }
 
-    public final RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream,
final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
+    public final RecordSchema getSchema(final Map<String, String> variables, final
InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException,
IOException {
         final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy();
         if (accessStrategy == null) {
             throw new SchemaNotFoundException("Could not determine the Schema Access Strategy
for this service");
         }
 
-        return getSchemaAccessStrategy().getSchema(flowFile, contentStream, readSchema);
+        return getSchemaAccessStrategy().getSchema(variables, contentStream, readSchema);
     }
 
-    public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema readSchema)
throws SchemaNotFoundException, IOException {
-        final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy();
-        if (accessStrategy == null) {
-            throw new SchemaNotFoundException("Could not determine the Schema Access Strategy
for this service");
-        }
-
-        return getSchemaAccessStrategy().getSchema(flowFile, EMPTY_INPUT_STREAM, readSchema);
+    public RecordSchema getSchema(final Map<String, String> variables, final RecordSchema
readSchema) throws SchemaNotFoundException, IOException {
+        return getSchema(variables, EMPTY_INPUT_STREAM, readSchema);
     }
 
     @Override
@@ -148,12 +143,12 @@ public abstract class SchemaRegistryService extends AbstractControllerService
{
         return suppliedFields;
     }
 
-    protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final
SchemaRegistry schemaRegistry, final ConfigurationContext context) {
-        if (allowableValue == null) {
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry
schemaRegistry, final ConfigurationContext context) {
+        if (strategy == null) {
             return null;
         }
 
-        return SchemaAccessUtils.getSchemaAccessStrategy(allowableValue, schemaRegistry,
context);
+        return SchemaAccessUtils.getSchemaAccessStrategy(strategy, schemaRegistry, context);
     }
 
     protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final
SchemaRegistry schemaRegistry, final ValidationContext context) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/1f67cbf6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
index 7057e43..1ed2aba 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
@@ -17,7 +17,6 @@
 
 package org.apache.nifi.text;
 
-import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -29,10 +28,11 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.access.InheritSchemaFromRecord;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter;
@@ -77,12 +77,13 @@ public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter
i
     }
 
     @Override
-    public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema,
final FlowFile flowFile, final OutputStream out) {
+    public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema,
final OutputStream out) {
         return new FreeFormTextWriter(textValue, characterSet, out);
     }
 
     @Override
-    public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema readSchema)
throws SchemaNotFoundException, IOException {
-        return readSchema;
+    protected SchemaAccessStrategy getSchemaAccessStrategy(String strategy, SchemaRegistry
schemaRegistry, ConfigurationContext context) {
+        return new InheritSchemaFromRecord();
     }
+
 }


Mime
View raw message