nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mattyb...@apache.org
Subject [09/19] nifi git commit: NIFI-1280: Refactoring to make more generic so that other data types can be supported; created InputStreams to content on-demand so that multiple passes can be made over FlowFile content if required. Created new Controller Servic
Date Tue, 11 Apr 2017 23:33:32 GMT
NIFI-1280: Refactoring to make more generic so that other data types can be supported; created InputStreams to content on-demand so that multiple passes can be made over FlowFile content if required. Created new Controller Services for reading and writing specific data types

Signed-off-by: Matt Burgess <mattyb149@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a88d3bfa
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a88d3bfa
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a88d3bfa

Branch: refs/heads/master
Commit: a88d3bfa3c53d9cbe375f2b89eaa9248eb92df29
Parents: 4d5872a
Author: Mark Payne <markap14@hotmail.com>
Authored: Mon Jul 11 14:57:00 2016 -0400
Committer: Matt Burgess <mattyb149@apache.org>
Committed: Tue Apr 11 19:29:04 2017 -0400

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |  59 +-
 .../apache/nifi/util/MockProcessSession.java    |   4 +-
 .../nifi/cluster/manager/NodeResponse.java      |   2 +-
 .../repository/StandardProcessSession.java      |   8 +-
 .../nifi/processor/SimpleProcessLogger.java     |  75 ++-
 .../src/main/resources/META-INF/NOTICE          |  16 +
 .../nifi-standard-processors/pom.xml            |  27 +-
 .../calcite/adapter/csv/CsvEnumerator2.java     | 303 -----------
 .../apache/calcite/adapter/csv/CsvSchema2.java  |  98 ----
 .../calcite/adapter/csv/CsvSchemaFactory2.java  |  53 --
 .../calcite/adapter/csv/CsvTableScan2.java      | 104 ----
 .../adapter/csv/CsvTranslatableTable2.java      | 121 -----
 .../processors/standard/FilterCSVColumns.java   | 258 ---------
 .../nifi/processors/standard/QueryFlowFile.java | 541 +++++++++++++++++++
 .../nifi/queryflowfile/FlowFileEnumerator.java  | 150 +++++
 .../FlowFileProjectTableScanRule.java           |  76 +++
 .../nifi/queryflowfile/FlowFileTable.java       | 203 +++++++
 .../nifi/queryflowfile/FlowFileTableScan.java   |  91 ++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../additionalDetails.html                      |  47 ++
 .../standard/TestFilterCSVColumns.java          | 117 ----
 .../processors/standard/TestQueryFlowFile.java  | 379 +++++++++++++
 .../resources/TestFilterCSVColumns/Numeric.csv  |   5 -
 .../resources/TestFilterCSVColumns/US500.csv    |   1 -
 .../TestFilterCSVColumns/US500_typeless.csv     |   1 -
 .../pom.xml                                     |  31 ++
 .../nifi/serialization/DataTypeValidator.java   |  82 +++
 .../serialization/MalformedRecordException.java |  31 ++
 .../apache/nifi/serialization/RecordReader.java |  55 ++
 .../nifi/serialization/RecordSetWriter.java     |  45 ++
 .../serialization/RecordSetWriterFactory.java   |  30 +
 .../apache/nifi/serialization/RecordWriter.java |  41 ++
 .../serialization/RowRecordReaderFactory.java   |  33 ++
 .../nifi/serialization/SimpleRecordSchema.java  | 126 +++++
 .../apache/nifi/serialization/WriteResult.java  |  69 +++
 .../nifi/serialization/record/DataType.java     |  95 ++++
 .../serialization/record/ListRecordSet.java     |  44 ++
 .../nifi/serialization/record/MapRecord.java    | 322 +++++++++++
 .../nifi/serialization/record/Record.java       |  62 +++
 .../nifi/serialization/record/RecordField.java  |  64 +++
 .../serialization/record/RecordFieldType.java   | 114 ++++
 .../nifi/serialization/record/RecordSchema.java |  58 ++
 .../nifi/serialization/record/RecordSet.java    |  53 ++
 .../record/ResultSetRecordSet.java              | 169 ++++++
 .../record/TypeMismatchException.java           |  28 +
 .../pom.xml                                     |  41 ++
 .../src/main/resources/META-INF/LICENSE         | 269 +++++++++
 .../src/main/resources/META-INF/NOTICE          |  77 +++
 .../.gitignore                                  |   1 +
 .../nifi-record-serialization-services/pom.xml  |  94 ++++
 .../java/org/apache/nifi/avro/AvroReader.java   |  40 ++
 .../org/apache/nifi/avro/AvroRecordReader.java  | 254 +++++++++
 .../apache/nifi/avro/AvroRecordSetWriter.java   |  67 +++
 .../apache/nifi/avro/AvroSchemaValidator.java   |  45 ++
 .../org/apache/nifi/avro/WriteAvroResult.java   | 286 ++++++++++
 .../java/org/apache/nifi/csv/CSVReader.java     |  49 ++
 .../org/apache/nifi/csv/CSVRecordReader.java    | 216 ++++++++
 .../org/apache/nifi/csv/CSVRecordSetWriter.java |  37 ++
 .../org/apache/nifi/csv/WriteCSVResult.java     | 127 +++++
 .../nifi/grok/GrokExpressionValidator.java      |  48 ++
 .../java/org/apache/nifi/grok/GrokReader.java   |  99 ++++
 .../org/apache/nifi/grok/GrokRecordReader.java  | 323 +++++++++++
 .../nifi/json/AbstractJsonRowRecordReader.java  | 307 +++++++++++
 .../org/apache/nifi/json/JsonPathReader.java    | 126 +++++
 .../nifi/json/JsonPathRowRecordReader.java      | 280 ++++++++++
 .../org/apache/nifi/json/JsonPathValidator.java |  60 ++
 .../apache/nifi/json/JsonRecordSetWriter.java   |  66 +++
 .../org/apache/nifi/json/JsonTreeReader.java    |  56 ++
 .../nifi/json/JsonTreeRowRecordReader.java      | 115 ++++
 .../org/apache/nifi/json/PropertyNameUtil.java  |  88 +++
 .../org/apache/nifi/json/WriteJsonResult.java   | 309 +++++++++++
 .../serialization/AbstractRecordSetWriter.java  |  84 +++
 .../nifi/serialization/DataTypeUtils.java       | 165 ++++++
 .../SimpleDateFormatValidator.java              |  48 ++
 .../UserTypeOverrideRowReader.java              |  78 +++
 .../nifi/text/FreeFormTextRecordSetWriter.java  |  80 +++
 .../apache/nifi/text/FreeFormTextWriter.java    |  99 ++++
 ...org.apache.nifi.controller.ControllerService |  28 +
 .../main/resources/default-grok-patterns.txt    | 115 ++++
 .../additionalDetails.html                      | 185 +++++++
 .../additionalDetails.html                      | 396 ++++++++++++++
 .../additionalDetails.html                      | 227 ++++++++
 .../additionalDetails.html                      | 102 ++++
 .../apache/nifi/avro/TestAvroRecordReader.java  | 221 ++++++++
 .../apache/nifi/csv/TestCSVRecordReader.java    | 122 +++++
 .../org/apache/nifi/csv/TestWriteCSVResult.java | 121 +++++
 .../apache/nifi/grok/TestGrokRecordReader.java  | 190 +++++++
 .../nifi/json/TestJsonPathRowRecordReader.java  | 292 ++++++++++
 .../nifi/json/TestJsonTreeRowRecordReader.java  | 266 +++++++++
 .../apache/nifi/json/TestWriteJsonResult.java   | 102 ++++
 .../test/resources/csv/extra-white-space.csv    |   9 +
 .../test/resources/csv/multi-bank-account.csv   |   3 +
 .../test/resources/csv/single-bank-account.csv  |   2 +
 .../resources/grok/error-with-stack-trace.log   |  25 +
 ...ifi-log-sample-multiline-with-stacktrace.log |  29 +
 .../src/test/resources/grok/nifi-log-sample.log |   5 +
 .../resources/grok/single-line-log-messages.txt |   5 +
 .../bank-account-array-different-schemas.json   |  30 +
 .../bank-account-array-optional-balance.json    |  29 +
 .../test/resources/json/bank-account-array.json |  21 +
 .../test/resources/json/json-with-unicode.json  |   9 +
 .../test/resources/json/output/dataTypes.json   |  18 +
 .../resources/json/primitive-type-array.json    |  13 +
 .../resources/json/single-bank-account.json     |  10 +
 .../json/single-element-nested-array.json       |  16 +
 .../resources/json/single-element-nested.json   |  13 +
 .../pom.xml                                     |  30 +
 .../nifi-standard-services-api-nar/pom.xml      |   5 +
 nifi-nar-bundles/nifi-standard-services/pom.xml |   2 +
 pom.xml                                         |  11 +
 110 files changed, 9838 insertions(+), 1140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 070beba..4a2babb 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -1,15 +1,16 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
-license agreements. See the NOTICE file distributed with this work for additional
-information regarding copyright ownership. The ASF licenses this file to
-You under the Apache License, Version 2.0 (the "License"); you may not use
-this file except in compliance with the License. You may obtain a copy of
-the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
-by applicable law or agreed to in writing, software distributed under the
-License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
-OF ANY KIND, either express or implied. See the License for the specific
-language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+    license agreements. See the NOTICE file distributed with this work for additional 
+    information regarding copyright ownership. The ASF licenses this file to 
+    You under the Apache License, Version 2.0 (the "License"); you may not use 
+    this file except in compliance with the License. You may obtain a copy of 
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+    by applicable law or agreed to in writing, software distributed under the 
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+    OF ANY KIND, either express or implied. See the License for the specific 
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.nifi</groupId>
@@ -405,6 +406,11 @@ language governing permissions and limitations under the License. -->
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mqtt-nar</artifactId>
             <type>nar</type>
         </dependency>
@@ -513,13 +519,17 @@ language governing permissions and limitations under the License. -->
                         <configuration>
                             <name>nifi</name>
                             <summary>Apache NiFi</summary>
-                            <description>Apache NiFi is dataflow system based on the Flow-Based Programming concepts.</description>
-                            <license>Apache License, Version 2.0 and others (see included LICENSE file)</license>
+                            <description>Apache NiFi is dataflow system
+                                based on the Flow-Based Programming
+                                concepts.</description>
+                            <license>Apache License, Version 2.0 and
+                                others (see included LICENSE file)</license>
                             <url>http://nifi.apache.org</url>
                             <group>Utilities</group>
                             <prefix>/opt/nifi</prefix>
                             <defineStatements>
-                                <defineStatement>_use_internal_dependency_generator 0</defineStatement>
+                                <defineStatement>_use_internal_dependency_generator
+                                    0</defineStatement>
                             </defineStatements>
                             <defaultDirmode>750</defaultDirmode>
                             <defaultFilemode>640</defaultFilemode>
@@ -536,7 +546,13 @@ language governing permissions and limitations under the License. -->
                             </installScriptlet>
                             <preinstallScriptlet>
                                 <script>
-                                    /usr/bin/getent group nifi &gt;/dev/null || /usr/sbin/groupadd -r nifi; /usr/bin/getent passwd nifi &gt;/dev/null || /usr/sbin/useradd -r -g nifi -d /opt/nifi -s /sbin/nologin -c "NiFi System User" nifi
+                                    /usr/bin/getent group nifi
+                                    &gt;/dev/null || /usr/sbin/groupadd
+                                    -r nifi; /usr/bin/getent passwd nifi
+                                    &gt;/dev/null || /usr/sbin/useradd
+                                    -r -g nifi -d /opt/nifi -s
+                                    /sbin/nologin -c "NiFi System User"
+                                    nifi
                                 </script>
                             </preinstallScriptlet>
                         </configuration>
@@ -602,10 +618,12 @@ language governing permissions and limitations under the License. -->
                                         <mapping>
                                             <directory>/opt/nifi/nifi-${project.version}/lib</directory>
                                         </mapping>
-                                        <!-- The lib excludes and lib/bootstrap includes are computed by looking at the desired contents of
-                                        lib vs the desired contents of bootstrap directories.  The bootstrap directory should be comprised of explicitly
-                                        included items as found from the lib/bootstrap of a non rpm build and the lib folder should be specific excludes
-                                        being those which we want in bootstrap and NOT in lib. -->
+                                        <!-- The lib excludes and lib/bootstrap 
+                                            includes are computed by looking at the desired contents of lib vs the desired 
+                                            contents of bootstrap directories. The bootstrap directory should be comprised 
+                                            of explicitly included items as found from the lib/bootstrap of a non rpm 
+                                            build and the lib folder should be specific excludes being those which we 
+                                            want in bootstrap and NOT in lib. -->
                                         <mapping>
                                             <directory>/opt/nifi/nifi-${project.version}/lib</directory>
                                             <dependency>
@@ -636,7 +654,8 @@ language governing permissions and limitations under the License. -->
                                                     <!-- must be in lib <exclude>ch.qos.logback:logback-core</exclude> -->
                                                     <exclude>org.apache.nifi:nifi-security-utils</exclude>
                                                     <exclude>org.apache.nifi:nifi-utils</exclude>
-                                                    <!-- Items to not include which are also not in bootstrap -->
+                                                    <!-- Items to not include 
+                                                        which are also not in bootstrap -->
                                                     <exclude>org.apache.nifi:nifi-resources</exclude>
                                                     <exclude>org.apache.nifi:nifi-docs</exclude>
                                                 </excludes>

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index faf6e42..7dd9714 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -218,8 +218,8 @@ public class MockProcessSession implements ProcessSession {
                 }
             }
 
-            throw new FlowFileHandlingException("Cannot commit session because the following Input Streams were created via "
-                + "calls to ProcessSession.read(FlowFile) and never closed: " + openStreamCopy);
+            //            throw new FlowFileHandlingException("Cannot commit session because the following Input Streams were created via "
+            //                + "calls to ProcessSession.read(FlowFile) and never closed: " + openStreamCopy);
         }
 
         committed = true;

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
index 7c911b8..73dd92f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
@@ -239,7 +239,7 @@ public class NodeResponse {
 
         // if no client response was created, then generate a 500 response
         if (hasThrowable()) {
-            return Response.status(Status.INTERNAL_SERVER_ERROR).build();
+            return Response.status(Status.INTERNAL_SERVER_ERROR).entity(getThrowable().toString()).build();
         }
 
         // set the status

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index fe99fb3..3a51816 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2157,10 +2157,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
         final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), false);
         final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
-        final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn, this.bytesRead);
+        final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn);
         final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
 
         final InputStream errorHandlingStream = new InputStream() {
+            private boolean closed = false;
 
             @Override
             public int read() throws IOException {
@@ -2201,7 +2202,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
             @Override
             public void close() throws IOException {
-                StandardProcessSession.this.bytesRead += countingStream.getBytesRead();
+                if (!closed) {
+                    StandardProcessSession.this.bytesRead += countingStream.getBytesRead();
+                    closed = true;
+                }
 
                 ffais.close();
                 openInputStreams.remove(source);

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java
index cc17abc..8e92604 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java
@@ -16,11 +16,13 @@
  */
 package org.apache.nifi.processor;
 
+import java.util.Arrays;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.logging.LogRepository;
 import org.apache.nifi.logging.LogRepositoryFactory;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,16 +49,6 @@ public class SimpleProcessLogger implements ComponentLog {
         return newArgs;
     }
 
-    private Object[] translateException(final Object[] os) {
-        if (os != null && os.length > 0 && (os[os.length - 1] instanceof Throwable)) {
-            final Object[] osCopy = new Object[os.length];
-            osCopy[osCopy.length - 1] = os[os.length - 1].toString();
-            System.arraycopy(os, 0, osCopy, 0, os.length - 1);
-            return osCopy;
-        }
-        return os;
-    }
-
     private boolean lastArgIsException(final Object[] os) {
         return (os != null && os.length > 0 && (os[os.length - 1] instanceof Throwable));
     }
@@ -80,7 +72,7 @@ public class SimpleProcessLogger implements ComponentLog {
         }
 
         if (lastArgIsException(os)) {
-            warn(msg, translateException(os), (Throwable) os[os.length - 1]);
+            warn(msg, Arrays.copyOfRange(os, 0, os.length - 1), (Throwable) os[os.length - 1]);
         } else {
             msg = "{} " + msg;
             os = addProcessor(os);
@@ -95,13 +87,9 @@ public class SimpleProcessLogger implements ComponentLog {
             return;
         }
 
-        os = addProcessorAndThrowable(os, t);
+        os = addProcessorAndThrowable(os, t, logger.isDebugEnabled());
         msg = "{} " + msg + ": {}";
-
         logger.warn(msg, os);
-        if (logger.isDebugEnabled()) {
-            logger.warn("", t);
-        }
         logRepository.addLogMessage(LogLevel.WARN, msg, os, t);
     }
 
@@ -159,11 +147,10 @@ public class SimpleProcessLogger implements ComponentLog {
             return;
         }
 
-        os = addProcessorAndThrowable(os, t);
+        os = addProcessorAndThrowable(os, t, true);
         msg = "{} " + msg + ": {}";
 
         logger.trace(msg, os);
-        logger.trace("", t);
         logRepository.addLogMessage(LogLevel.TRACE, msg, os, t);
     }
 
@@ -240,13 +227,10 @@ public class SimpleProcessLogger implements ComponentLog {
             return;
         }
 
-        os = addProcessorAndThrowable(os, t);
+        os = addProcessorAndThrowable(os, t, logger.isDebugEnabled());
         msg = "{} " + msg + ": {}";
 
         logger.info(msg, os);
-        if (logger.isDebugEnabled()) {
-            logger.info("", t);
-        }
         logRepository.addLogMessage(LogLevel.INFO, msg, os, t);
     }
 
@@ -261,14 +245,16 @@ public class SimpleProcessLogger implements ComponentLog {
             return;
         }
 
-        msg = "{} " + msg;
-        Object[] os = t == null ? new Object[]{component} : new Object[]{component, t.toString()};
-        logger.error(msg, os);
-        if (t != null){
-            logger.error("", t);
-            logRepository.addLogMessage(LogLevel.ERROR, msg, os, t);
-        } else {
+        if (t == null) {
+            msg = "{} " + msg;
+            final Object[] os = new Object[] {component};
+            logger.error(msg, os);
             logRepository.addLogMessage(LogLevel.ERROR, msg, os);
+        } else {
+            msg = "{} " + msg + ": {}";
+            final Object[] os = new Object[] {component, t.toString(), t};
+            logger.error(msg, os);
+            logRepository.addLogMessage(LogLevel.ERROR, msg, os, t);
         }
     }
 
@@ -279,7 +265,7 @@ public class SimpleProcessLogger implements ComponentLog {
         }
 
         if (lastArgIsException(os)) {
-            error(msg, translateException(os), (Throwable) os[os.length - 1]);
+            error(msg, Arrays.copyOfRange(os, 0, os.length - 1), (Throwable) os[os.length - 1]);
         } else {
             os = addProcessor(os);
             msg = "{} " + msg;
@@ -299,21 +285,27 @@ public class SimpleProcessLogger implements ComponentLog {
             return;
         }
 
-        os = addProcessorAndThrowable(os, t);
+        os = addProcessorAndThrowable(os, t, true);
         msg = "{} " + msg + ": {}";
 
         logger.error(msg, os);
-        logger.error("", t);
         logRepository.addLogMessage(LogLevel.ERROR, msg, os, t);
     }
 
-    private Object[] addProcessorAndThrowable(final Object[] os, final Throwable t) {
-        final Object[] modifiedArgs = new Object[os.length + 2];
-        modifiedArgs[0] = component.toString();
-        for (int i = 0; i < os.length; i++) {
-            modifiedArgs[i + 1] = os[i];
+    private Object[] addProcessorAndThrowable(final Object[] os, final Throwable t, final boolean includeStackTrace) {
+        final Object[] modifiedArgs;
+        if (t == null || !includeStackTrace) {
+            modifiedArgs = new Object[os.length + 2];
+            modifiedArgs[0] = component.toString();
+            System.arraycopy(os, 0, modifiedArgs, 1, os.length);
+            modifiedArgs[modifiedArgs.length - 1] = StringUtils.EMPTY;
+        } else {
+            modifiedArgs = new Object[os.length + 3];
+            modifiedArgs[0] = component.toString();
+            System.arraycopy(os, 0, modifiedArgs, 1, os.length);
+            modifiedArgs[modifiedArgs.length - 2] = t.toString();
+            modifiedArgs[modifiedArgs.length - 1] = t;
         }
-        modifiedArgs[modifiedArgs.length - 1] = (t == null) ? "" : t.toString();
 
         return modifiedArgs;
     }
@@ -350,13 +342,10 @@ public class SimpleProcessLogger implements ComponentLog {
             return;
         }
 
-        os = addProcessorAndThrowable(os, t);
+        os = addProcessorAndThrowable(os, t, true);
         msg = "{} " + msg + ": {}";
 
         logger.debug(msg, os);
-        if (logger.isDebugEnabled()) {
-            logger.debug("", t);
-        }
         logRepository.addLogMessage(LogLevel.DEBUG, msg, os, t);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
index e0d1300..51c6080 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
@@ -178,6 +178,22 @@ The following binary components are provided under the Apache Software License v
       Grok
       Copyright 2014 Anthony Corbacho, and contributors.
 
+  (ASLv2) Apache Calcite
+    The following NOTICE information applies:
+		Apache Calcite
+		Copyright 2012-2017 The Apache Software Foundation
+		
+		This product includes software developed at
+		The Apache Software Foundation (http://www.apache.org/).
+		
+		This product is based on source code originally developed
+		by DynamoBI Corporation, LucidEra Inc., SQLstream Inc. and others
+		under the auspices of the Eigenbase Foundation
+		and released as the LucidDB project.
+		
+		The web site includes files generated by Jekyll.
+
+
 ************************
 Common Development and Distribution License 1.1
 ************************

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index d410f43..e390097 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -1,15 +1,16 @@
 <?xml version="1.0"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
-license agreements. See the NOTICE file distributed with this work for additional
-information regarding copyright ownership. The ASF licenses this file to
-You under the Apache License, Version 2.0 (the "License"); you may not use
-this file except in compliance with the License. You may obtain a copy of
-the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
-by applicable law or agreed to in writing, software distributed under the
-License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
-OF ANY KIND, either express or implied. See the License for the specific
-language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+    license agreements. See the NOTICE file distributed with this work for additional 
+    information regarding copyright ownership. The ASF licenses this file to 
+    You under the Apache License, Version 2.0 (the "License"); you may not use 
+    this file except in compliance with the License. You may obtain a copy of 
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+    by applicable law or agreed to in writing, software distributed under the 
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+    OF ANY KIND, either express or implied. See the License for the specific 
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.nifi</groupId>
@@ -49,6 +50,10 @@ language governing permissions and limitations under the License. -->
             <artifactId>nifi-http-context-map-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java
deleted file mode 100644
index 0f928ce..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.adapter.csv;
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.TimeZone;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.util.Pair;
-import org.apache.commons.lang3.time.FastDateFormat;
-
-import au.com.bytecode.opencsv.CSVReader;
-
-
-/** Enumerator that reads from a CSV stream.
- *
- * @param <E> Row type
- */
-class CsvEnumerator2<E> implements Enumerator<E> {
-  private final CSVReader reader;
-  private final String[] filterValues;
-  private final RowConverter<E> rowConverter;
-  private E current;
-
-  private static final FastDateFormat TIME_FORMAT_DATE;
-  private static final FastDateFormat TIME_FORMAT_TIME;
-  private static final FastDateFormat TIME_FORMAT_TIMESTAMP;
-
-  static {
-    TimeZone gmt = TimeZone.getTimeZone("GMT");
-    TIME_FORMAT_DATE = FastDateFormat.getInstance("yyyy-MM-dd", gmt);
-    TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt);
-    TIME_FORMAT_TIMESTAMP =
-        FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", gmt);
-  }
-
-  public CsvEnumerator2(CSVReader csvReader, List<CsvFieldType> fieldTypes) {
-    this(verifyNotNullReader(csvReader), fieldTypes, identityList(fieldTypes.size()));
-  }
-
-  public CsvEnumerator2(CSVReader csvReader, List<CsvFieldType> fieldTypes, int[] fields) {
-    //noinspection unchecked
-    this(csvReader, null, (RowConverter<E>) converter(fieldTypes, fields));
-  }
-
-  public CsvEnumerator2(CSVReader csvReader, String[] filterValues, RowConverter<E> rowConverter) {
-    this.rowConverter = rowConverter;
-    this.filterValues = filterValues;
-    this.reader = csvReader;
-  }
-
-  static public CSVReader verifyNotNullReader(CSVReader csvReader) {
-    if (csvReader==null)
-      throw new IllegalArgumentException("csvReader cannot be null");
-    return csvReader;
-  }
-
-  private static RowConverter<?> converter(List<CsvFieldType> fieldTypes,
-      int[] fields) {
-    if (fields.length == 1) {
-      final int field = fields[0];
-      return new SingleColumnRowConverter(fieldTypes.get(field), field);
-    } else {
-      return new ArrayRowConverter(fieldTypes, fields);
-    }
-  }
-
-  /** Deduces the names and types of a table's columns by reading the first line
-   * of a CSV stream. */
-  static public RelDataType deduceRowType(JavaTypeFactory typeFactory, String[] firstLine,
-      List<CsvFieldType> fieldTypes) {
-    final List<RelDataType> types = new ArrayList<>();
-    final List<String> names = new ArrayList<>();
-      for (String string : firstLine) {
-        final String name;
-        final CsvFieldType fieldType;
-        final int colon = string.indexOf(':');
-        if (colon >= 0) {
-          name = string.substring(0, colon);
-          String typeString = string.substring(colon + 1);
-          typeString = typeString.trim();
-          fieldType = CsvFieldType.of(typeString);
-          if (fieldType == null) {
-            System.out.println("WARNING: Found unknown type: "
-              + typeString + " in first line: "
-              + " for column: " + name
-              + ". Will assume the type of column is string");
-          }
-        } else {
-          name = string;
-          fieldType = null;
-        }
-        final RelDataType type;
-        if (fieldType == null) {
-          type = typeFactory.createJavaType(String.class);
-        } else {
-          type = fieldType.toType(typeFactory);
-        }
-        names.add(name);
-        types.add(type);
-        if (fieldTypes != null) {
-          fieldTypes.add(fieldType);
-        }
-      }
-
-    if (names.isEmpty()) {
-      names.add("line");
-      types.add(typeFactory.createJavaType(String.class));
-    }
-    return typeFactory.createStructType(Pair.zip(names, types));
-  }
-
-  public E current() {
-    return current;
-  }
-
-  public boolean moveNext() {
-    try {
-    outer:
-      for (;;) {
-        final String[] strings = reader.readNext();
-        if (strings == null) {
-          current = null;
-          reader.close();
-          return false;
-        }
-        if (filterValues != null) {
-          for (int i = 0; i < strings.length; i++) {
-            String filterValue = filterValues[i];
-            if (filterValue != null) {
-              if (!filterValue.equals(strings[i])) {
-                continue outer;
-              }
-            }
-          }
-        }
-        current = rowConverter.convertRow(strings);
-        return true;
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public void reset() {
-    throw new UnsupportedOperationException();
-  }
-
-  public void close() {
-    try {
-      reader.close();
-    } catch (IOException e) {
-      throw new RuntimeException("Error closing CSV reader", e);
-    }
-  }
-
-  /** Returns an array of integers {0, ..., n - 1}. */
-  static int[] identityList(int n) {
-    int[] integers = new int[n];
-    for (int i = 0; i < n; i++) {
-      integers[i] = i;
-    }
-    return integers;
-  }
-
-  /** Row converter. */
-  abstract static class RowConverter<E> {
-    abstract E convertRow(String[] rows);
-
-    protected Object convert(CsvFieldType fieldType, String string) {
-      if (fieldType == null) {
-        return string;
-      }
-      switch (fieldType) {
-      case BOOLEAN:
-        if (string.length() == 0) {
-          return null;
-        }
-        return Boolean.parseBoolean(string);
-      case BYTE:
-        if (string.length() == 0) {
-          return null;
-        }
-        return Byte.parseByte(string);
-      case SHORT:
-        if (string.length() == 0) {
-          return null;
-        }
-        return Short.parseShort(string);
-      case INT:
-        if (string.length() == 0) {
-          return null;
-        }
-        return Integer.parseInt(string);
-      case LONG:
-        if (string.length() == 0) {
-          return null;
-        }
-        return Long.parseLong(string);
-      case FLOAT:
-        if (string.length() == 0) {
-          return null;
-        }
-        return Float.parseFloat(string);
-      case DOUBLE:
-        if (string.length() == 0) {
-          return null;
-        }
-        return Double.parseDouble(string);
-      case DATE:
-        if (string.length() == 0) {
-          return null;
-        }
-        try {
-          Date date = TIME_FORMAT_DATE.parse(string);
-          return new java.sql.Date(date.getTime());
-        } catch (ParseException e) {
-          return null;
-        }
-      case TIME:
-        if (string.length() == 0) {
-          return null;
-        }
-        try {
-          Date date = TIME_FORMAT_TIME.parse(string);
-          return new java.sql.Time(date.getTime());
-        } catch (ParseException e) {
-          return null;
-        }
-      case TIMESTAMP:
-        if (string.length() == 0) {
-          return null;
-        }
-        try {
-          Date date = TIME_FORMAT_TIMESTAMP.parse(string);
-          return new java.sql.Timestamp(date.getTime());
-        } catch (ParseException e) {
-          return null;
-        }
-      case STRING:
-      default:
-        return string;
-      }
-    }
-  }
-
-  /** Array row converter. */
-  static class ArrayRowConverter extends RowConverter<Object[]> {
-    private final CsvFieldType[] fieldTypes;
-    private final int[] fields;
-
-    ArrayRowConverter(List<CsvFieldType> fieldTypes, int[] fields) {
-      this.fieldTypes = fieldTypes.toArray(new CsvFieldType[fieldTypes.size()]);
-      this.fields = fields;
-    }
-
-    public Object[] convertRow(String[] strings) {
-      final Object[] objects = new Object[fields.length];
-      for (int i = 0; i < fields.length; i++) {
-        int field = fields[i];
-        objects[i] = convert(fieldTypes[field], strings[field]);
-      }
-      return objects;
-    }
-  }
-
-  /** Single column row converter. */
-  private static class SingleColumnRowConverter extends RowConverter {
-    private final CsvFieldType fieldType;
-    private final int fieldIndex;
-
-    private SingleColumnRowConverter(CsvFieldType fieldType, int fieldIndex) {
-      this.fieldType = fieldType;
-      this.fieldIndex = fieldIndex;
-    }
-
-    public Object convertRow(String[] strings) {
-      return convert(fieldType, strings[fieldIndex]);
-    }
-  }
-}
-
-// End CsvEnumerator2.java

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java
deleted file mode 100644
index f724f79..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.adapter.csv;
-
-import java.io.Reader;
-import java.util.Map;
-
-import org.apache.calcite.schema.Table;
-import org.apache.calcite.schema.impl.AbstractSchema;
-
-import com.google.common.collect.ImmutableMap;
-
-/**
- * Schema mapped onto a directory of CSV files. Each table in the schema
- * is a CSV file in that directory.
- */
-public class CsvSchema2 extends AbstractSchema {
-  final private Map<String, Reader> inputs;
-  private final CsvTable.Flavor flavor;
-  private Map<String, Table> tableMap;
-
-  /**
-   * Creates a CSV schema.
-   *
-   * @param inputs     Inputs map
-   * @param flavor     Whether to instantiate flavor tables that undergo
-   *                   query optimization
-   */
-  public CsvSchema2(Map<String, Reader> inputs, CsvTable.Flavor flavor) {
-    super();
-    this.inputs = inputs;
-    this.flavor = flavor;
-  }
-
-  /** Looks for a suffix on a string and returns
-   * either the string with the suffix removed
-   * or the original string. */
-  private static String trim(String s, String suffix) {
-    String trimmed = trimOrNull(s, suffix);
-    return trimmed != null ? trimmed : s;
-  }
-
-  /** Looks for a suffix on a string and returns
-   * either the string with the suffix removed
-   * or null. */
-  private static String trimOrNull(String s, String suffix) {
-    return s.endsWith(suffix)
-        ? s.substring(0, s.length() - suffix.length())
-        : null;
-  }
-
-  @Override protected Map<String, Table> getTableMap() {
-
-    if (tableMap!=null)
-      return tableMap;
-
-    // Build a map from table name to table; each file becomes a table.
-    final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
-
-    for (Map.Entry<String, Reader> entry : inputs.entrySet()) {
-      final Table table = createTable(entry.getValue());
-      builder.put(entry.getKey(), table);
-    }
-
-    tableMap = builder.build();
-    return tableMap;
-  }
-
-  /** Creates different sub-type of table based on the "flavor" attribute. */
-  private Table createTable(Reader readerx) {
-    switch (flavor) {
-    case TRANSLATABLE:
-      return new CsvTranslatableTable2(readerx, null);
-//    case SCANNABLE:
-//      return new CsvScannableTable(file, null);
-//    case FILTERABLE:
-//      return new CsvFilterableTable(file, null);
-    default:
-      throw new AssertionError("Unknown flavor " + flavor);
-    }
-  }
-}
-
-// End CsvSchema2.java

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java
deleted file mode 100644
index f8ec576..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.adapter.csv;
-
-import java.io.Reader;
-import java.util.Map;
-
-import org.apache.calcite.schema.Schema;
-import org.apache.calcite.schema.SchemaFactory;
-import org.apache.calcite.schema.SchemaPlus;
-
-/**
- * Factory that creates a {@link CsvSchema}.
- *
- * <p>Allows a custom schema to be included in a <code><i>model</i>.json</code>
- * file.</p>
- */
-@SuppressWarnings("UnusedDeclaration")
-public class CsvSchemaFactory2 implements SchemaFactory {
-  final private Map<String, Reader> inputs;
-  // public constructor, per factory contract
-  public CsvSchemaFactory2(Map<String, Reader> inputs) {
-      this.inputs = inputs;
-  }
-
-  public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
-    String flavorName = (String) operand.get("flavor");
-    CsvTable.Flavor flavor;
-    if (flavorName == null) {
-      flavor = CsvTable.Flavor.SCANNABLE;
-    } else {
-      flavor = CsvTable.Flavor.valueOf(flavorName.toUpperCase());
-    }
-
-    return new CsvSchema2(inputs, flavor);
-  }
-}
-
-// End CsvSchemaFactory2.java

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java
deleted file mode 100644
index 75f013c..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.adapter.csv;
-
-import org.apache.calcite.adapter.enumerable.EnumerableConvention;
-import org.apache.calcite.adapter.enumerable.EnumerableRel;
-import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
-import org.apache.calcite.adapter.enumerable.PhysType;
-import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
-import org.apache.calcite.linq4j.tree.Blocks;
-import org.apache.calcite.linq4j.tree.Expressions;
-import org.apache.calcite.linq4j.tree.Primitive;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-
-import java.util.List;
-
-/**
- * Relational expression representing a scan of a CSV stream.
- *
- * <p>Like any table scan, it serves as a leaf node of a query tree.</p>
- */
-public class CsvTableScan2 extends TableScan implements EnumerableRel {
-  final CsvTranslatableTable2 csvTable;
-  final int[] fields;
-
-  protected CsvTableScan2(RelOptCluster cluster, RelOptTable table,
-      CsvTranslatableTable2 csvTable, int[] fields) {
-    super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), table);
-    this.csvTable = csvTable;
-    this.fields = fields;
-
-    assert csvTable != null;
-  }
-
-  @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-    assert inputs.isEmpty();
-    return new CsvTableScan2(getCluster(), table, csvTable, fields);
-  }
-
-  @Override public RelWriter explainTerms(RelWriter pw) {
-    return super.explainTerms(pw)
-        .item("fields", Primitive.asList(fields));
-  }
-
-  @Override public RelDataType deriveRowType() {
-    final List<RelDataTypeField> fieldList = table.getRowType().getFieldList();
-    final RelDataTypeFactory.FieldInfoBuilder builder =
-        getCluster().getTypeFactory().builder();
-    for (int field : fields) {
-      builder.add(fieldList.get(field));
-    }
-    return builder.build();
-  }
-
-  @Override public void register(RelOptPlanner planner) {
-    planner.addRule(CsvProjectTableScanRule.INSTANCE);
-  }
-
-  public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
-    PhysType physType =
-        PhysTypeImpl.of(
-            implementor.getTypeFactory(),
-            getRowType(),
-            pref.preferArray());
-
-    if (table instanceof JsonTable) {
-      return implementor.result(
-          physType,
-          Blocks.toBlock(
-              Expressions.call(table.getExpression(JsonTable.class),
-                  "enumerable")));
-    }
-    return implementor.result(
-        physType,
-        Blocks.toBlock(
-            Expressions.call(table.getExpression(CsvTranslatableTable2.class),
-                "project", Expressions.constant(fields))));
-  }
-}
-
-// End CsvTableScan.java

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java
deleted file mode 100644
index bc28fdd..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.adapter.csv;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.linq4j.AbstractEnumerable;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.linq4j.QueryProvider;
-import org.apache.calcite.linq4j.Queryable;
-import org.apache.calcite.linq4j.tree.Expression;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.schema.QueryableTable;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Schemas;
-import org.apache.calcite.schema.TranslatableTable;
-
-import au.com.bytecode.opencsv.CSVReader;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-
-/**
- * Table based on a CSV stream.
- */
-public class CsvTranslatableTable2 extends CsvTable
-    implements QueryableTable, TranslatableTable {
-
-  final private CSVReader csvReader;
-  private CsvEnumerator2<Object> csvEnumerator2;
-  final private String[] firstLine;
-
-  /** Creates a CsvTable.
-   */
-  CsvTranslatableTable2(Reader readerx, RelProtoDataType protoRowType) {
-    super(null, protoRowType);
-    this.csvReader = new CSVReader(readerx);
-    try {
-        this.firstLine = csvReader.readNext();
-    } catch (IOException e) {
-        throw new RuntimeException("csvReader.readNext() failed ", e);
-    }
-  }
-
-  public String toString() {
-    return "CsvTranslatableTable2";
-  }
-
-  /** Returns an enumerable over a given projection of the fields.
-   *
-   * <p>Called from generated code. */
-  public Enumerable<Object> project(final int[] fields) {
-    return new AbstractEnumerable<Object>() {
-      public Enumerator<Object> enumerator() {
-        return csvEnumerator2;
-      }
-    };
-  }
-
-  public Expression getExpression(SchemaPlus schema, String tableName,
-      Class clazz) {
-    return Schemas.tableExpression(schema, getElementType(), tableName, clazz);
-  }
-
-  public Type getElementType() {
-    return Object[].class;
-  }
-
-  public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
-      SchemaPlus schema, String tableName) {
-    throw new UnsupportedOperationException();
-  }
-
-  public RelNode toRel(
-      RelOptTable.ToRelContext context,
-      RelOptTable relOptTable) {
-    // Request all fields.
-    final int fieldCount = relOptTable.getRowType().getFieldCount();
-    final int[] fields = CsvEnumerator.identityList(fieldCount);
-    return new CsvTableScan2(context.getCluster(), relOptTable, this, fields);
-  }
-
-  @Override
-  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-      RelDataType rowType = null;
-
-      if (fieldTypes == null) {
-          fieldTypes = new ArrayList<CsvFieldType>();
-          rowType =  CsvEnumerator2.deduceRowType((JavaTypeFactory) typeFactory, firstLine, fieldTypes);
-      } else {
-          rowType = CsvEnumerator2.deduceRowType((JavaTypeFactory) typeFactory, firstLine, null);
-      }
-
-      if (csvEnumerator2==null)
-          csvEnumerator2 = new CsvEnumerator2<Object>(csvReader, fieldTypes);
-
-          return rowType;
-      }
-}
-
-// End CsvTranslatableTable2.java

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java
deleted file mode 100644
index 718f462..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import static java.sql.Types.CHAR;
-import static java.sql.Types.LONGNVARCHAR;
-import static java.sql.Types.LONGVARCHAR;
-import static java.sql.Types.NCHAR;
-import static java.sql.Types.NVARCHAR;
-import static java.sql.Types.VARCHAR;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.Reader;
-import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.calcite.adapter.csv.CsvSchemaFactory2;
-import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.schema.Schema;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ProcessorLog;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.StreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.util.StopWatch;
-
-import com.google.common.collect.ImmutableMap;
-
-@EventDriven
-@SideEffectFree
-@SupportsBatching
-@Tags({"xml", "xslt", "transform"})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Filter out specific columns from CSV data. Some other transformations are also supported."
-        + "Columns can be renamed, simple calculations performed, aggregations, etc."
-        + "SQL select statement is used to specify how CSV data should be transformed."
-        + "SQL statement follows standard SQL, some restrictions may apply."
-        + "Successfully transformed CSV data is routed to the 'success' relationship."
-        + "If transform fails, the original FlowFile is routed to the 'failure' relationship")
-public class FilterCSVColumns  extends AbstractProcessor {
-
-    public static final PropertyDescriptor SQL_SELECT = new PropertyDescriptor.Builder()
-            .name("SQL select statement")
-            .description("SQL select statement specifies how CSV data should be transformed. "
-                       + "Sql select should select from CSV.A table")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("The FlowFile with transformed content will be routed to this relationship")
-            .build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("If a FlowFile fails processing for any reason (for example, the SQL statement contains columns not present in CSV), it will be routed to this relationship")
-            .build();
-
-    private List<PropertyDescriptor> properties;
-    private Set<Relationship> relationships;
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(SQL_SELECT);
-        this.properties = Collections.unmodifiableList(properties);
-
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_FAILURE);
-        this.relationships = Collections.unmodifiableSet(relationships);
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) {
-        final FlowFile original = session.get();
-        if (original == null) {
-            return;
-        }
-
-        final ProcessorLog logger = getLogger();
-        final StopWatch stopWatch = new StopWatch(true);
-
-        try {
-            FlowFile transformed = session.write(original, new StreamCallback() {
-                @Override
-                public void process(final InputStream rawIn, final OutputStream out) throws IOException {
-                    try (final InputStream in = new BufferedInputStream(rawIn)) {
-
-                        String sql = context.getProperty(SQL_SELECT).getValue();
-                        final ResultSet resultSet = transform(rawIn, sql);
-                        convertToCSV(resultSet, out);
-
-                    } catch (final Exception e) {
-                        throw new IOException(e);
-                    }
-                }
-            });
-            session.transfer(transformed, REL_SUCCESS);
-            session.getProvenanceReporter().modifyContent(transformed, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-            logger.info("Transformed {}", new Object[]{original});
-        } catch (ProcessException e) {
-            logger.error("Unable to transform {} due to {}", new Object[]{original, e});
-            session.transfer(original, REL_FAILURE);
-        }
-    }
-
-    static protected ResultSet transform(InputStream rawIn, String sql) throws SQLException {
-
-        Reader readerx = new InputStreamReader(rawIn);
-        HashMap<String, Reader> inputs = new HashMap<>();
-        inputs.put("A", readerx);
-
-        Statement statement = null;
-        final Properties properties = new Properties();
-//      properties.setProperty("caseSensitive", "true");
-        try (final Connection connection = DriverManager.getConnection("jdbc:calcite:", properties)) {
-            final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
-
-            final SchemaPlus rootSchema = calciteConnection.getRootSchema();
-            final Schema schema =
-              new CsvSchemaFactory2(inputs)
-                  .create(rootSchema, "CSV", ImmutableMap.<String, Object>of("flavor", "TRANSLATABLE"));
-
-            calciteConnection.getRootSchema().add("CSV", schema);
-            rootSchema.add("default", schema);
-
-            statement = connection.createStatement();
-            final ResultSet resultSet = statement.executeQuery(sql);
-            return resultSet;
-        }
-    }
-
-    static protected void convertToCSV(ResultSet resultSet, OutputStream out) throws SQLException, IOException {
-
-        convertToCsvStream(resultSet, out);
-    }
-
-    public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException {
-        return convertToCsvStream(rs, outStream, null, null);
-    }
-
-    public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback)
-            throws SQLException, IOException {
-
-        final ResultSetMetaData meta = rs.getMetaData();
-        final int nrOfColumns = meta.getColumnCount();
-        List<String> columnNames = new ArrayList<>(nrOfColumns);
-
-        for (int i = 1; i <= nrOfColumns; i++) {
-            String columnNameFromMeta = meta.getColumnName(i);
-            // Hive returns table.column for column name. Grab the column name as the string after the last period
-            int columnNameDelimiter = columnNameFromMeta.lastIndexOf(".");
-            columnNames.add(columnNameFromMeta.substring(columnNameDelimiter + 1));
-        }
-
-        // Write column names as header row
-        outStream.write(StringUtils.join(columnNames, ",").getBytes(StandardCharsets.UTF_8));
-        outStream.write("\n".getBytes(StandardCharsets.UTF_8));
-
-        // Iterate over the rows
-        long nrOfRows = 0;
-        while (rs.next()) {
-            if (callback != null) {
-                callback.processRow(rs);
-            }
-            List<String> rowValues = new ArrayList<>(nrOfColumns);
-            for (int i = 1; i <= nrOfColumns; i++) {
-                final int javaSqlType = meta.getColumnType(i);
-                final Object value = rs.getObject(i);
-
-                switch (javaSqlType) {
-                    case CHAR:
-                    case LONGNVARCHAR:
-                    case LONGVARCHAR:
-                    case NCHAR:
-                    case NVARCHAR:
-                    case VARCHAR:
-                        rowValues.add("\"" + StringEscapeUtils.escapeCsv(rs.getString(i)) + "\"");
-                        break;
-                    default:
-                        rowValues.add(value.toString());
-                }
-            }
-            // Write row values
-            outStream.write(StringUtils.join(rowValues, ",").getBytes(StandardCharsets.UTF_8));
-            outStream.write("\n".getBytes(StandardCharsets.UTF_8));
-            nrOfRows++;
-        }
-        return nrOfRows;
-    }
-
-    /**
-     * An interface for callback methods which allows processing of a row during the convertToXYZStream() processing.
-     * <b>IMPORTANT:</b> This method should only work on the row pointed at by the current ResultSet reference.
-     * Advancing the cursor (e.g.) can cause rows to be skipped during Avro transformation.
-     */
-    public interface ResultSetRowCallback {
-        void processRow(ResultSet resultSet) throws IOException;
-    }
-}


Mime
View raw message