Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7CF01200C62 for ; Wed, 12 Apr 2017 01:33:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7B577160B9B; Tue, 11 Apr 2017 23:33:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5FE3E160BAD for ; Wed, 12 Apr 2017 01:33:27 +0200 (CEST) Received: (qmail 7813 invoked by uid 500); 11 Apr 2017 23:33:26 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 7340 invoked by uid 99); 11 Apr 2017 23:33:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Apr 2017 23:33:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 86E36E0287; Tue, 11 Apr 2017 23:33:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mattyb149@apache.org To: commits@nifi.apache.org Date: Tue, 11 Apr 2017 23:33:32 -0000 Message-Id: <5ee07fe8f67c4d1fa650454cfd70776b@git.apache.org> In-Reply-To: <01db5f0048964b4ea67d552abdf01eb0@git.apache.org> References: <01db5f0048964b4ea67d552abdf01eb0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/19] nifi git commit: NIFI-1280: Refactoring to make more generic so that other data types can be supported; created InputStreams to content on-demand so that multiple passes can be made over FlowFile content if required. Created new Controller Servic archived-at: Tue, 11 Apr 2017 23:33:29 -0000 NIFI-1280: Refactoring to make more generic so that other data types can be supported; created InputStreams to content on-demand so that multiple passes can be made over FlowFile content if required. Created new Controller Services for reading and writing specific data types Signed-off-by: Matt Burgess Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a88d3bfa Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a88d3bfa Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a88d3bfa Branch: refs/heads/master Commit: a88d3bfa3c53d9cbe375f2b89eaa9248eb92df29 Parents: 4d5872a Author: Mark Payne Authored: Mon Jul 11 14:57:00 2016 -0400 Committer: Matt Burgess Committed: Tue Apr 11 19:29:04 2017 -0400 ---------------------------------------------------------------------- nifi-assembly/pom.xml | 59 +- .../apache/nifi/util/MockProcessSession.java | 4 +- .../nifi/cluster/manager/NodeResponse.java | 2 +- .../repository/StandardProcessSession.java | 8 +- .../nifi/processor/SimpleProcessLogger.java | 75 ++- .../src/main/resources/META-INF/NOTICE | 16 + .../nifi-standard-processors/pom.xml | 27 +- .../calcite/adapter/csv/CsvEnumerator2.java | 303 ----------- .../apache/calcite/adapter/csv/CsvSchema2.java | 98 ---- .../calcite/adapter/csv/CsvSchemaFactory2.java | 53 -- .../calcite/adapter/csv/CsvTableScan2.java | 104 ---- .../adapter/csv/CsvTranslatableTable2.java | 121 ----- .../processors/standard/FilterCSVColumns.java | 258 --------- .../nifi/processors/standard/QueryFlowFile.java | 541 +++++++++++++++++++ .../nifi/queryflowfile/FlowFileEnumerator.java | 150 +++++ .../FlowFileProjectTableScanRule.java | 76 +++ .../nifi/queryflowfile/FlowFileTable.java | 203 +++++++ .../nifi/queryflowfile/FlowFileTableScan.java | 91 ++++ .../org.apache.nifi.processor.Processor | 1 + .../additionalDetails.html | 47 ++ .../standard/TestFilterCSVColumns.java | 117 ---- .../processors/standard/TestQueryFlowFile.java | 379 +++++++++++++ .../resources/TestFilterCSVColumns/Numeric.csv | 5 - .../resources/TestFilterCSVColumns/US500.csv | 1 - .../TestFilterCSVColumns/US500_typeless.csv | 1 - .../pom.xml | 31 ++ .../nifi/serialization/DataTypeValidator.java | 82 +++ .../serialization/MalformedRecordException.java | 31 ++ .../apache/nifi/serialization/RecordReader.java | 55 ++ .../nifi/serialization/RecordSetWriter.java | 45 ++ .../serialization/RecordSetWriterFactory.java | 30 + .../apache/nifi/serialization/RecordWriter.java | 41 ++ .../serialization/RowRecordReaderFactory.java | 33 ++ .../nifi/serialization/SimpleRecordSchema.java | 126 +++++ .../apache/nifi/serialization/WriteResult.java | 69 +++ .../nifi/serialization/record/DataType.java | 95 ++++ .../serialization/record/ListRecordSet.java | 44 ++ .../nifi/serialization/record/MapRecord.java | 322 +++++++++++ .../nifi/serialization/record/Record.java | 62 +++ .../nifi/serialization/record/RecordField.java | 64 +++ .../serialization/record/RecordFieldType.java | 114 ++++ .../nifi/serialization/record/RecordSchema.java | 58 ++ .../nifi/serialization/record/RecordSet.java | 53 ++ .../record/ResultSetRecordSet.java | 169 ++++++ .../record/TypeMismatchException.java | 28 + .../pom.xml | 41 ++ .../src/main/resources/META-INF/LICENSE | 269 +++++++++ .../src/main/resources/META-INF/NOTICE | 77 +++ .../.gitignore | 1 + .../nifi-record-serialization-services/pom.xml | 94 ++++ .../java/org/apache/nifi/avro/AvroReader.java | 40 ++ .../org/apache/nifi/avro/AvroRecordReader.java | 254 +++++++++ .../apache/nifi/avro/AvroRecordSetWriter.java | 67 +++ .../apache/nifi/avro/AvroSchemaValidator.java | 45 ++ .../org/apache/nifi/avro/WriteAvroResult.java | 286 ++++++++++ .../java/org/apache/nifi/csv/CSVReader.java | 49 ++ .../org/apache/nifi/csv/CSVRecordReader.java | 216 ++++++++ .../org/apache/nifi/csv/CSVRecordSetWriter.java | 37 ++ .../org/apache/nifi/csv/WriteCSVResult.java | 127 +++++ .../nifi/grok/GrokExpressionValidator.java | 48 ++ .../java/org/apache/nifi/grok/GrokReader.java | 99 ++++ .../org/apache/nifi/grok/GrokRecordReader.java | 323 +++++++++++ .../nifi/json/AbstractJsonRowRecordReader.java | 307 +++++++++++ .../org/apache/nifi/json/JsonPathReader.java | 126 +++++ .../nifi/json/JsonPathRowRecordReader.java | 280 ++++++++++ .../org/apache/nifi/json/JsonPathValidator.java | 60 ++ .../apache/nifi/json/JsonRecordSetWriter.java | 66 +++ .../org/apache/nifi/json/JsonTreeReader.java | 56 ++ .../nifi/json/JsonTreeRowRecordReader.java | 115 ++++ .../org/apache/nifi/json/PropertyNameUtil.java | 88 +++ .../org/apache/nifi/json/WriteJsonResult.java | 309 +++++++++++ .../serialization/AbstractRecordSetWriter.java | 84 +++ .../nifi/serialization/DataTypeUtils.java | 165 ++++++ .../SimpleDateFormatValidator.java | 48 ++ .../UserTypeOverrideRowReader.java | 78 +++ .../nifi/text/FreeFormTextRecordSetWriter.java | 80 +++ .../apache/nifi/text/FreeFormTextWriter.java | 99 ++++ ...org.apache.nifi.controller.ControllerService | 28 + .../main/resources/default-grok-patterns.txt | 115 ++++ .../additionalDetails.html | 185 +++++++ .../additionalDetails.html | 396 ++++++++++++++ .../additionalDetails.html | 227 ++++++++ .../additionalDetails.html | 102 ++++ .../apache/nifi/avro/TestAvroRecordReader.java | 221 ++++++++ .../apache/nifi/csv/TestCSVRecordReader.java | 122 +++++ .../org/apache/nifi/csv/TestWriteCSVResult.java | 121 +++++ .../apache/nifi/grok/TestGrokRecordReader.java | 190 +++++++ .../nifi/json/TestJsonPathRowRecordReader.java | 292 ++++++++++ .../nifi/json/TestJsonTreeRowRecordReader.java | 266 +++++++++ .../apache/nifi/json/TestWriteJsonResult.java | 102 ++++ .../test/resources/csv/extra-white-space.csv | 9 + .../test/resources/csv/multi-bank-account.csv | 3 + .../test/resources/csv/single-bank-account.csv | 2 + .../resources/grok/error-with-stack-trace.log | 25 + ...ifi-log-sample-multiline-with-stacktrace.log | 29 + .../src/test/resources/grok/nifi-log-sample.log | 5 + .../resources/grok/single-line-log-messages.txt | 5 + .../bank-account-array-different-schemas.json | 30 + .../bank-account-array-optional-balance.json | 29 + .../test/resources/json/bank-account-array.json | 21 + .../test/resources/json/json-with-unicode.json | 9 + .../test/resources/json/output/dataTypes.json | 18 + .../resources/json/primitive-type-array.json | 13 + .../resources/json/single-bank-account.json | 10 + .../json/single-element-nested-array.json | 16 + .../resources/json/single-element-nested.json | 13 + .../pom.xml | 30 + .../nifi-standard-services-api-nar/pom.xml | 5 + nifi-nar-bundles/nifi-standard-services/pom.xml | 2 + pom.xml | 11 + 110 files changed, 9838 insertions(+), 1140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 070beba..4a2babb 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -1,15 +1,16 @@ - - + + 4.0.0 org.apache.nifi @@ -405,6 +406,11 @@ language governing permissions and limitations under the License. --> org.apache.nifi + nifi-record-serialization-services-nar + nar + + + org.apache.nifi nifi-mqtt-nar nar @@ -513,13 +519,17 @@ language governing permissions and limitations under the License. --> nifi Apache NiFi - Apache NiFi is dataflow system based on the Flow-Based Programming concepts. - Apache License, Version 2.0 and others (see included LICENSE file) + Apache NiFi is dataflow system + based on the Flow-Based Programming + concepts. + Apache License, Version 2.0 and + others (see included LICENSE file) http://nifi.apache.org Utilities /opt/nifi - _use_internal_dependency_generator 0 + _use_internal_dependency_generator + 0 750 640 @@ -536,7 +546,13 @@ language governing permissions and limitations under the License. --> @@ -602,10 +618,12 @@ language governing permissions and limitations under the License. --> /opt/nifi/nifi-${project.version}/lib - + /opt/nifi/nifi-${project.version}/lib @@ -636,7 +654,8 @@ language governing permissions and limitations under the License. --> org.apache.nifi:nifi-security-utils org.apache.nifi:nifi-utils - + org.apache.nifi:nifi-resources org.apache.nifi:nifi-docs http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index faf6e42..7dd9714 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -218,8 +218,8 @@ public class MockProcessSession implements ProcessSession { } } - throw new FlowFileHandlingException("Cannot commit session because the following Input Streams were created via " - + "calls to ProcessSession.read(FlowFile) and never closed: " + openStreamCopy); + // throw new FlowFileHandlingException("Cannot commit session because the following Input Streams were created via " + // + "calls to ProcessSession.read(FlowFile) and never closed: " + openStreamCopy); } committed = true; http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java index 7c911b8..73dd92f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java @@ -239,7 +239,7 @@ public class NodeResponse { // if no client response was created, then generate a 500 response if (hasThrowable()) { - return Response.status(Status.INTERNAL_SERVER_ERROR).build(); + return Response.status(Status.INTERNAL_SERVER_ERROR).entity(getThrowable().toString()).build(); } // set the status http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index fe99fb3..3a51816 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -2157,10 +2157,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), false); final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); - final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn, this.bytesRead); + final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn); final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim()); final InputStream errorHandlingStream = new InputStream() { + private boolean closed = false; @Override public int read() throws IOException { @@ -2201,7 +2202,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void close() throws IOException { - StandardProcessSession.this.bytesRead += countingStream.getBytesRead(); + if (!closed) { + StandardProcessSession.this.bytesRead += countingStream.getBytesRead(); + closed = true; + } ffais.close(); openInputStreams.remove(source); http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java index cc17abc..8e92604 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java @@ -16,11 +16,13 @@ */ package org.apache.nifi.processor; +import java.util.Arrays; + +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.logging.LogRepository; import org.apache.nifi.logging.LogRepositoryFactory; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,16 +49,6 @@ public class SimpleProcessLogger implements ComponentLog { return newArgs; } - private Object[] translateException(final Object[] os) { - if (os != null && os.length > 0 && (os[os.length - 1] instanceof Throwable)) { - final Object[] osCopy = new Object[os.length]; - osCopy[osCopy.length - 1] = os[os.length - 1].toString(); - System.arraycopy(os, 0, osCopy, 0, os.length - 1); - return osCopy; - } - return os; - } - private boolean lastArgIsException(final Object[] os) { return (os != null && os.length > 0 && (os[os.length - 1] instanceof Throwable)); } @@ -80,7 +72,7 @@ public class SimpleProcessLogger implements ComponentLog { } if (lastArgIsException(os)) { - warn(msg, translateException(os), (Throwable) os[os.length - 1]); + warn(msg, Arrays.copyOfRange(os, 0, os.length - 1), (Throwable) os[os.length - 1]); } else { msg = "{} " + msg; os = addProcessor(os); @@ -95,13 +87,9 @@ public class SimpleProcessLogger implements ComponentLog { return; } - os = addProcessorAndThrowable(os, t); + os = addProcessorAndThrowable(os, t, logger.isDebugEnabled()); msg = "{} " + msg + ": {}"; - logger.warn(msg, os); - if (logger.isDebugEnabled()) { - logger.warn("", t); - } logRepository.addLogMessage(LogLevel.WARN, msg, os, t); } @@ -159,11 +147,10 @@ public class SimpleProcessLogger implements ComponentLog { return; } - os = addProcessorAndThrowable(os, t); + os = addProcessorAndThrowable(os, t, true); msg = "{} " + msg + ": {}"; logger.trace(msg, os); - logger.trace("", t); logRepository.addLogMessage(LogLevel.TRACE, msg, os, t); } @@ -240,13 +227,10 @@ public class SimpleProcessLogger implements ComponentLog { return; } - os = addProcessorAndThrowable(os, t); + os = addProcessorAndThrowable(os, t, logger.isDebugEnabled()); msg = "{} " + msg + ": {}"; logger.info(msg, os); - if (logger.isDebugEnabled()) { - logger.info("", t); - } logRepository.addLogMessage(LogLevel.INFO, msg, os, t); } @@ -261,14 +245,16 @@ public class SimpleProcessLogger implements ComponentLog { return; } - msg = "{} " + msg; - Object[] os = t == null ? new Object[]{component} : new Object[]{component, t.toString()}; - logger.error(msg, os); - if (t != null){ - logger.error("", t); - logRepository.addLogMessage(LogLevel.ERROR, msg, os, t); - } else { + if (t == null) { + msg = "{} " + msg; + final Object[] os = new Object[] {component}; + logger.error(msg, os); logRepository.addLogMessage(LogLevel.ERROR, msg, os); + } else { + msg = "{} " + msg + ": {}"; + final Object[] os = new Object[] {component, t.toString(), t}; + logger.error(msg, os); + logRepository.addLogMessage(LogLevel.ERROR, msg, os, t); } } @@ -279,7 +265,7 @@ public class SimpleProcessLogger implements ComponentLog { } if (lastArgIsException(os)) { - error(msg, translateException(os), (Throwable) os[os.length - 1]); + error(msg, Arrays.copyOfRange(os, 0, os.length - 1), (Throwable) os[os.length - 1]); } else { os = addProcessor(os); msg = "{} " + msg; @@ -299,21 +285,27 @@ public class SimpleProcessLogger implements ComponentLog { return; } - os = addProcessorAndThrowable(os, t); + os = addProcessorAndThrowable(os, t, true); msg = "{} " + msg + ": {}"; logger.error(msg, os); - logger.error("", t); logRepository.addLogMessage(LogLevel.ERROR, msg, os, t); } - private Object[] addProcessorAndThrowable(final Object[] os, final Throwable t) { - final Object[] modifiedArgs = new Object[os.length + 2]; - modifiedArgs[0] = component.toString(); - for (int i = 0; i < os.length; i++) { - modifiedArgs[i + 1] = os[i]; + private Object[] addProcessorAndThrowable(final Object[] os, final Throwable t, final boolean includeStackTrace) { + final Object[] modifiedArgs; + if (t == null || !includeStackTrace) { + modifiedArgs = new Object[os.length + 2]; + modifiedArgs[0] = component.toString(); + System.arraycopy(os, 0, modifiedArgs, 1, os.length); + modifiedArgs[modifiedArgs.length - 1] = StringUtils.EMPTY; + } else { + modifiedArgs = new Object[os.length + 3]; + modifiedArgs[0] = component.toString(); + System.arraycopy(os, 0, modifiedArgs, 1, os.length); + modifiedArgs[modifiedArgs.length - 2] = t.toString(); + modifiedArgs[modifiedArgs.length - 1] = t; } - modifiedArgs[modifiedArgs.length - 1] = (t == null) ? "" : t.toString(); return modifiedArgs; } @@ -350,13 +342,10 @@ public class SimpleProcessLogger implements ComponentLog { return; } - os = addProcessorAndThrowable(os, t); + os = addProcessorAndThrowable(os, t, true); msg = "{} " + msg + ": {}"; logger.debug(msg, os); - if (logger.isDebugEnabled()) { - logger.debug("", t); - } logRepository.addLogMessage(LogLevel.DEBUG, msg, os, t); } http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE index e0d1300..51c6080 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE @@ -178,6 +178,22 @@ The following binary components are provided under the Apache Software License v Grok Copyright 2014 Anthony Corbacho, and contributors. + (ASLv2) Apache Calcite + The following NOTICE information applies: + Apache Calcite + Copyright 2012-2017 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + This product is based on source code originally developed + by DynamoBI Corporation, LucidEra Inc., SQLstream Inc. and others + under the auspices of the Eigenbase Foundation + and released as the LucidDB project. + + The web site includes files generated by Jekyll. + + ************************ Common Development and Distribution License 1.1 ************************ http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index d410f43..e390097 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -1,15 +1,16 @@ - - + + 4.0.0 org.apache.nifi @@ -49,6 +50,10 @@ language governing permissions and limitations under the License. --> nifi-http-context-map-api + org.apache.nifi + nifi-record-serialization-service-api + + commons-io commons-io http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java deleted file mode 100644 index 0f928ce..0000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.adapter.csv; - -import java.io.IOException; -import java.text.ParseException; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.TimeZone; - -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.linq4j.Enumerator; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.util.Pair; -import org.apache.commons.lang3.time.FastDateFormat; - -import au.com.bytecode.opencsv.CSVReader; - - -/** Enumerator that reads from a CSV stream. - * - * @param Row type - */ -class CsvEnumerator2 implements Enumerator { - private final CSVReader reader; - private final String[] filterValues; - private final RowConverter rowConverter; - private E current; - - private static final FastDateFormat TIME_FORMAT_DATE; - private static final FastDateFormat TIME_FORMAT_TIME; - private static final FastDateFormat TIME_FORMAT_TIMESTAMP; - - static { - TimeZone gmt = TimeZone.getTimeZone("GMT"); - TIME_FORMAT_DATE = FastDateFormat.getInstance("yyyy-MM-dd", gmt); - TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt); - TIME_FORMAT_TIMESTAMP = - FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", gmt); - } - - public CsvEnumerator2(CSVReader csvReader, List fieldTypes) { - this(verifyNotNullReader(csvReader), fieldTypes, identityList(fieldTypes.size())); - } - - public CsvEnumerator2(CSVReader csvReader, List fieldTypes, int[] fields) { - //noinspection unchecked - this(csvReader, null, (RowConverter) converter(fieldTypes, fields)); - } - - public CsvEnumerator2(CSVReader csvReader, String[] filterValues, RowConverter rowConverter) { - this.rowConverter = rowConverter; - this.filterValues = filterValues; - this.reader = csvReader; - } - - static public CSVReader verifyNotNullReader(CSVReader csvReader) { - if (csvReader==null) - throw new IllegalArgumentException("csvReader cannot be null"); - return csvReader; - } - - private static RowConverter converter(List fieldTypes, - int[] fields) { - if (fields.length == 1) { - final int field = fields[0]; - return new SingleColumnRowConverter(fieldTypes.get(field), field); - } else { - return new ArrayRowConverter(fieldTypes, fields); - } - } - - /** Deduces the names and types of a table's columns by reading the first line - * of a CSV stream. */ - static public RelDataType deduceRowType(JavaTypeFactory typeFactory, String[] firstLine, - List fieldTypes) { - final List types = new ArrayList<>(); - final List names = new ArrayList<>(); - for (String string : firstLine) { - final String name; - final CsvFieldType fieldType; - final int colon = string.indexOf(':'); - if (colon >= 0) { - name = string.substring(0, colon); - String typeString = string.substring(colon + 1); - typeString = typeString.trim(); - fieldType = CsvFieldType.of(typeString); - if (fieldType == null) { - System.out.println("WARNING: Found unknown type: " - + typeString + " in first line: " - + " for column: " + name - + ". Will assume the type of column is string"); - } - } else { - name = string; - fieldType = null; - } - final RelDataType type; - if (fieldType == null) { - type = typeFactory.createJavaType(String.class); - } else { - type = fieldType.toType(typeFactory); - } - names.add(name); - types.add(type); - if (fieldTypes != null) { - fieldTypes.add(fieldType); - } - } - - if (names.isEmpty()) { - names.add("line"); - types.add(typeFactory.createJavaType(String.class)); - } - return typeFactory.createStructType(Pair.zip(names, types)); - } - - public E current() { - return current; - } - - public boolean moveNext() { - try { - outer: - for (;;) { - final String[] strings = reader.readNext(); - if (strings == null) { - current = null; - reader.close(); - return false; - } - if (filterValues != null) { - for (int i = 0; i < strings.length; i++) { - String filterValue = filterValues[i]; - if (filterValue != null) { - if (!filterValue.equals(strings[i])) { - continue outer; - } - } - } - } - current = rowConverter.convertRow(strings); - return true; - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public void reset() { - throw new UnsupportedOperationException(); - } - - public void close() { - try { - reader.close(); - } catch (IOException e) { - throw new RuntimeException("Error closing CSV reader", e); - } - } - - /** Returns an array of integers {0, ..., n - 1}. */ - static int[] identityList(int n) { - int[] integers = new int[n]; - for (int i = 0; i < n; i++) { - integers[i] = i; - } - return integers; - } - - /** Row converter. */ - abstract static class RowConverter { - abstract E convertRow(String[] rows); - - protected Object convert(CsvFieldType fieldType, String string) { - if (fieldType == null) { - return string; - } - switch (fieldType) { - case BOOLEAN: - if (string.length() == 0) { - return null; - } - return Boolean.parseBoolean(string); - case BYTE: - if (string.length() == 0) { - return null; - } - return Byte.parseByte(string); - case SHORT: - if (string.length() == 0) { - return null; - } - return Short.parseShort(string); - case INT: - if (string.length() == 0) { - return null; - } - return Integer.parseInt(string); - case LONG: - if (string.length() == 0) { - return null; - } - return Long.parseLong(string); - case FLOAT: - if (string.length() == 0) { - return null; - } - return Float.parseFloat(string); - case DOUBLE: - if (string.length() == 0) { - return null; - } - return Double.parseDouble(string); - case DATE: - if (string.length() == 0) { - return null; - } - try { - Date date = TIME_FORMAT_DATE.parse(string); - return new java.sql.Date(date.getTime()); - } catch (ParseException e) { - return null; - } - case TIME: - if (string.length() == 0) { - return null; - } - try { - Date date = TIME_FORMAT_TIME.parse(string); - return new java.sql.Time(date.getTime()); - } catch (ParseException e) { - return null; - } - case TIMESTAMP: - if (string.length() == 0) { - return null; - } - try { - Date date = TIME_FORMAT_TIMESTAMP.parse(string); - return new java.sql.Timestamp(date.getTime()); - } catch (ParseException e) { - return null; - } - case STRING: - default: - return string; - } - } - } - - /** Array row converter. */ - static class ArrayRowConverter extends RowConverter { - private final CsvFieldType[] fieldTypes; - private final int[] fields; - - ArrayRowConverter(List fieldTypes, int[] fields) { - this.fieldTypes = fieldTypes.toArray(new CsvFieldType[fieldTypes.size()]); - this.fields = fields; - } - - public Object[] convertRow(String[] strings) { - final Object[] objects = new Object[fields.length]; - for (int i = 0; i < fields.length; i++) { - int field = fields[i]; - objects[i] = convert(fieldTypes[field], strings[field]); - } - return objects; - } - } - - /** Single column row converter. */ - private static class SingleColumnRowConverter extends RowConverter { - private final CsvFieldType fieldType; - private final int fieldIndex; - - private SingleColumnRowConverter(CsvFieldType fieldType, int fieldIndex) { - this.fieldType = fieldType; - this.fieldIndex = fieldIndex; - } - - public Object convertRow(String[] strings) { - return convert(fieldType, strings[fieldIndex]); - } - } -} - -// End CsvEnumerator2.java http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java deleted file mode 100644 index f724f79..0000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.adapter.csv; - -import java.io.Reader; -import java.util.Map; - -import org.apache.calcite.schema.Table; -import org.apache.calcite.schema.impl.AbstractSchema; - -import com.google.common.collect.ImmutableMap; - -/** - * Schema mapped onto a directory of CSV files. Each table in the schema - * is a CSV file in that directory. - */ -public class CsvSchema2 extends AbstractSchema { - final private Map inputs; - private final CsvTable.Flavor flavor; - private Map tableMap; - - /** - * Creates a CSV schema. - * - * @param inputs Inputs map - * @param flavor Whether to instantiate flavor tables that undergo - * query optimization - */ - public CsvSchema2(Map inputs, CsvTable.Flavor flavor) { - super(); - this.inputs = inputs; - this.flavor = flavor; - } - - /** Looks for a suffix on a string and returns - * either the string with the suffix removed - * or the original string. */ - private static String trim(String s, String suffix) { - String trimmed = trimOrNull(s, suffix); - return trimmed != null ? trimmed : s; - } - - /** Looks for a suffix on a string and returns - * either the string with the suffix removed - * or null. */ - private static String trimOrNull(String s, String suffix) { - return s.endsWith(suffix) - ? s.substring(0, s.length() - suffix.length()) - : null; - } - - @Override protected Map getTableMap() { - - if (tableMap!=null) - return tableMap; - - // Build a map from table name to table; each file becomes a table. - final ImmutableMap.Builder builder = ImmutableMap.builder(); - - for (Map.Entry entry : inputs.entrySet()) { - final Table table = createTable(entry.getValue()); - builder.put(entry.getKey(), table); - } - - tableMap = builder.build(); - return tableMap; - } - - /** Creates different sub-type of table based on the "flavor" attribute. */ - private Table createTable(Reader readerx) { - switch (flavor) { - case TRANSLATABLE: - return new CsvTranslatableTable2(readerx, null); -// case SCANNABLE: -// return new CsvScannableTable(file, null); -// case FILTERABLE: -// return new CsvFilterableTable(file, null); - default: - throw new AssertionError("Unknown flavor " + flavor); - } - } -} - -// End CsvSchema2.java http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java deleted file mode 100644 index f8ec576..0000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.adapter.csv; - -import java.io.Reader; -import java.util.Map; - -import org.apache.calcite.schema.Schema; -import org.apache.calcite.schema.SchemaFactory; -import org.apache.calcite.schema.SchemaPlus; - -/** - * Factory that creates a {@link CsvSchema}. - * - *

Allows a custom schema to be included in a model.json - * file.

- */ -@SuppressWarnings("UnusedDeclaration") -public class CsvSchemaFactory2 implements SchemaFactory { - final private Map inputs; - // public constructor, per factory contract - public CsvSchemaFactory2(Map inputs) { - this.inputs = inputs; - } - - public Schema create(SchemaPlus parentSchema, String name, Map operand) { - String flavorName = (String) operand.get("flavor"); - CsvTable.Flavor flavor; - if (flavorName == null) { - flavor = CsvTable.Flavor.SCANNABLE; - } else { - flavor = CsvTable.Flavor.valueOf(flavorName.toUpperCase()); - } - - return new CsvSchema2(inputs, flavor); - } -} - -// End CsvSchemaFactory2.java http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java deleted file mode 100644 index 75f013c..0000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.adapter.csv; - -import org.apache.calcite.adapter.enumerable.EnumerableConvention; -import org.apache.calcite.adapter.enumerable.EnumerableRel; -import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; -import org.apache.calcite.adapter.enumerable.PhysType; -import org.apache.calcite.adapter.enumerable.PhysTypeImpl; -import org.apache.calcite.linq4j.tree.Blocks; -import org.apache.calcite.linq4j.tree.Expressions; -import org.apache.calcite.linq4j.tree.Primitive; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptPlanner; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelWriter; -import org.apache.calcite.rel.core.TableScan; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelDataTypeField; - -import java.util.List; - -/** - * Relational expression representing a scan of a CSV stream. - * - *

Like any table scan, it serves as a leaf node of a query tree.

- */ -public class CsvTableScan2 extends TableScan implements EnumerableRel { - final CsvTranslatableTable2 csvTable; - final int[] fields; - - protected CsvTableScan2(RelOptCluster cluster, RelOptTable table, - CsvTranslatableTable2 csvTable, int[] fields) { - super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), table); - this.csvTable = csvTable; - this.fields = fields; - - assert csvTable != null; - } - - @Override public RelNode copy(RelTraitSet traitSet, List inputs) { - assert inputs.isEmpty(); - return new CsvTableScan2(getCluster(), table, csvTable, fields); - } - - @Override public RelWriter explainTerms(RelWriter pw) { - return super.explainTerms(pw) - .item("fields", Primitive.asList(fields)); - } - - @Override public RelDataType deriveRowType() { - final List fieldList = table.getRowType().getFieldList(); - final RelDataTypeFactory.FieldInfoBuilder builder = - getCluster().getTypeFactory().builder(); - for (int field : fields) { - builder.add(fieldList.get(field)); - } - return builder.build(); - } - - @Override public void register(RelOptPlanner planner) { - planner.addRule(CsvProjectTableScanRule.INSTANCE); - } - - public Result implement(EnumerableRelImplementor implementor, Prefer pref) { - PhysType physType = - PhysTypeImpl.of( - implementor.getTypeFactory(), - getRowType(), - pref.preferArray()); - - if (table instanceof JsonTable) { - return implementor.result( - physType, - Blocks.toBlock( - Expressions.call(table.getExpression(JsonTable.class), - "enumerable"))); - } - return implementor.result( - physType, - Blocks.toBlock( - Expressions.call(table.getExpression(CsvTranslatableTable2.class), - "project", Expressions.constant(fields)))); - } -} - -// End CsvTableScan.java http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java deleted file mode 100644 index bc28fdd..0000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.adapter.csv; - -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.linq4j.AbstractEnumerable; -import org.apache.calcite.linq4j.Enumerable; -import org.apache.calcite.linq4j.Enumerator; -import org.apache.calcite.linq4j.QueryProvider; -import org.apache.calcite.linq4j.Queryable; -import org.apache.calcite.linq4j.tree.Expression; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.schema.QueryableTable; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.Schemas; -import org.apache.calcite.schema.TranslatableTable; - -import au.com.bytecode.opencsv.CSVReader; - -import java.io.IOException; -import java.io.Reader; -import java.lang.reflect.Type; -import java.util.ArrayList; - -/** - * Table based on a CSV stream. - */ -public class CsvTranslatableTable2 extends CsvTable - implements QueryableTable, TranslatableTable { - - final private CSVReader csvReader; - private CsvEnumerator2 csvEnumerator2; - final private String[] firstLine; - - /** Creates a CsvTable. - */ - CsvTranslatableTable2(Reader readerx, RelProtoDataType protoRowType) { - super(null, protoRowType); - this.csvReader = new CSVReader(readerx); - try { - this.firstLine = csvReader.readNext(); - } catch (IOException e) { - throw new RuntimeException("csvReader.readNext() failed ", e); - } - } - - public String toString() { - return "CsvTranslatableTable2"; - } - - /** Returns an enumerable over a given projection of the fields. - * - *

Called from generated code. */ - public Enumerable project(final int[] fields) { - return new AbstractEnumerable() { - public Enumerator enumerator() { - return csvEnumerator2; - } - }; - } - - public Expression getExpression(SchemaPlus schema, String tableName, - Class clazz) { - return Schemas.tableExpression(schema, getElementType(), tableName, clazz); - } - - public Type getElementType() { - return Object[].class; - } - - public Queryable asQueryable(QueryProvider queryProvider, - SchemaPlus schema, String tableName) { - throw new UnsupportedOperationException(); - } - - public RelNode toRel( - RelOptTable.ToRelContext context, - RelOptTable relOptTable) { - // Request all fields. - final int fieldCount = relOptTable.getRowType().getFieldCount(); - final int[] fields = CsvEnumerator.identityList(fieldCount); - return new CsvTableScan2(context.getCluster(), relOptTable, this, fields); - } - - @Override - public RelDataType getRowType(RelDataTypeFactory typeFactory) { - RelDataType rowType = null; - - if (fieldTypes == null) { - fieldTypes = new ArrayList(); - rowType = CsvEnumerator2.deduceRowType((JavaTypeFactory) typeFactory, firstLine, fieldTypes); - } else { - rowType = CsvEnumerator2.deduceRowType((JavaTypeFactory) typeFactory, firstLine, null); - } - - if (csvEnumerator2==null) - csvEnumerator2 = new CsvEnumerator2(csvReader, fieldTypes); - - return rowType; - } -} - -// End CsvTranslatableTable2.java http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java deleted file mode 100644 index 718f462..0000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import static java.sql.Types.CHAR; -import static java.sql.Types.LONGNVARCHAR; -import static java.sql.Types.LONGVARCHAR; -import static java.sql.Types.NCHAR; -import static java.sql.Types.NVARCHAR; -import static java.sql.Types.VARCHAR; - -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.Reader; -import java.nio.charset.StandardCharsets; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.apache.calcite.adapter.csv.CsvSchemaFactory2; -import org.apache.calcite.jdbc.CalciteConnection; -import org.apache.calcite.schema.Schema; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.commons.lang3.StringEscapeUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.annotation.behavior.EventDriven; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.StreamCallback; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.util.StopWatch; - -import com.google.common.collect.ImmutableMap; - -@EventDriven -@SideEffectFree -@SupportsBatching -@Tags({"xml", "xslt", "transform"}) -@InputRequirement(Requirement.INPUT_REQUIRED) -@CapabilityDescription("Filter out specific columns from CSV data. Some other transformations are also supported." - + "Columns can be renamed, simple calculations performed, aggregations, etc." - + "SQL select statement is used to specify how CSV data should be transformed." - + "SQL statement follows standard SQL, some restrictions may apply." - + "Successfully transformed CSV data is routed to the 'success' relationship." - + "If transform fails, the original FlowFile is routed to the 'failure' relationship") -public class FilterCSVColumns extends AbstractProcessor { - - public static final PropertyDescriptor SQL_SELECT = new PropertyDescriptor.Builder() - .name("SQL select statement") - .description("SQL select statement specifies how CSV data should be transformed. " - + "Sql select should select from CSV.A table") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("The FlowFile with transformed content will be routed to this relationship") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("If a FlowFile fails processing for any reason (for example, the SQL statement contains columns not present in CSV), it will be routed to this relationship") - .build(); - - private List properties; - private Set relationships; - - @Override - protected void init(final ProcessorInitializationContext context) { - final List properties = new ArrayList<>(); - properties.add(SQL_SELECT); - this.properties = Collections.unmodifiableList(properties); - - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - this.relationships = Collections.unmodifiableSet(relationships); - } - - @Override - public Set getRelationships() { - return relationships; - } - - @Override - protected List getSupportedPropertyDescriptors() { - return properties; - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { - final FlowFile original = session.get(); - if (original == null) { - return; - } - - final ProcessorLog logger = getLogger(); - final StopWatch stopWatch = new StopWatch(true); - - try { - FlowFile transformed = session.write(original, new StreamCallback() { - @Override - public void process(final InputStream rawIn, final OutputStream out) throws IOException { - try (final InputStream in = new BufferedInputStream(rawIn)) { - - String sql = context.getProperty(SQL_SELECT).getValue(); - final ResultSet resultSet = transform(rawIn, sql); - convertToCSV(resultSet, out); - - } catch (final Exception e) { - throw new IOException(e); - } - } - }); - session.transfer(transformed, REL_SUCCESS); - session.getProvenanceReporter().modifyContent(transformed, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - logger.info("Transformed {}", new Object[]{original}); - } catch (ProcessException e) { - logger.error("Unable to transform {} due to {}", new Object[]{original, e}); - session.transfer(original, REL_FAILURE); - } - } - - static protected ResultSet transform(InputStream rawIn, String sql) throws SQLException { - - Reader readerx = new InputStreamReader(rawIn); - HashMap inputs = new HashMap<>(); - inputs.put("A", readerx); - - Statement statement = null; - final Properties properties = new Properties(); -// properties.setProperty("caseSensitive", "true"); - try (final Connection connection = DriverManager.getConnection("jdbc:calcite:", properties)) { - final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); - - final SchemaPlus rootSchema = calciteConnection.getRootSchema(); - final Schema schema = - new CsvSchemaFactory2(inputs) - .create(rootSchema, "CSV", ImmutableMap.of("flavor", "TRANSLATABLE")); - - calciteConnection.getRootSchema().add("CSV", schema); - rootSchema.add("default", schema); - - statement = connection.createStatement(); - final ResultSet resultSet = statement.executeQuery(sql); - return resultSet; - } - } - - static protected void convertToCSV(ResultSet resultSet, OutputStream out) throws SQLException, IOException { - - convertToCsvStream(resultSet, out); - } - - public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException { - return convertToCsvStream(rs, outStream, null, null); - } - - public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback) - throws SQLException, IOException { - - final ResultSetMetaData meta = rs.getMetaData(); - final int nrOfColumns = meta.getColumnCount(); - List columnNames = new ArrayList<>(nrOfColumns); - - for (int i = 1; i <= nrOfColumns; i++) { - String columnNameFromMeta = meta.getColumnName(i); - // Hive returns table.column for column name. Grab the column name as the string after the last period - int columnNameDelimiter = columnNameFromMeta.lastIndexOf("."); - columnNames.add(columnNameFromMeta.substring(columnNameDelimiter + 1)); - } - - // Write column names as header row - outStream.write(StringUtils.join(columnNames, ",").getBytes(StandardCharsets.UTF_8)); - outStream.write("\n".getBytes(StandardCharsets.UTF_8)); - - // Iterate over the rows - long nrOfRows = 0; - while (rs.next()) { - if (callback != null) { - callback.processRow(rs); - } - List rowValues = new ArrayList<>(nrOfColumns); - for (int i = 1; i <= nrOfColumns; i++) { - final int javaSqlType = meta.getColumnType(i); - final Object value = rs.getObject(i); - - switch (javaSqlType) { - case CHAR: - case LONGNVARCHAR: - case LONGVARCHAR: - case NCHAR: - case NVARCHAR: - case VARCHAR: - rowValues.add("\"" + StringEscapeUtils.escapeCsv(rs.getString(i)) + "\""); - break; - default: - rowValues.add(value.toString()); - } - } - // Write row values - outStream.write(StringUtils.join(rowValues, ",").getBytes(StandardCharsets.UTF_8)); - outStream.write("\n".getBytes(StandardCharsets.UTF_8)); - nrOfRows++; - } - return nrOfRows; - } - - /** - * An interface for callback methods which allows processing of a row during the convertToXYZStream() processing. - * IMPORTANT: This method should only work on the row pointed at by the current ResultSet reference. - * Advancing the cursor (e.g.) can cause rows to be skipped during Avro transformation. - */ - public interface ResultSetRowCallback { - void processRow(ResultSet resultSet) throws IOException; - } -}