nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alopre...@apache.org
Subject [17/17] nifi git commit: NIFI-3724 - Initial commit of Parquet bundle with PutParquet and FetchParquet - Creating nifi-records-utils to share utility code from record services - Refactoring Parquet tests to use MockRecorderParser and MockRecordWriter - R
Date Mon, 01 May 2017 20:12:10 GMT
NIFI-3724 - Initial commit of Parquet bundle with PutParquet and FetchParquet
- Creating nifi-records-utils to share utility code from record services
- Refactoring Parquet tests to use MockRecorderParser and MockRecordWriter
- Refactoring AbstractPutHDFSRecord to use schema access strategy
- Adding custom validate to AbstractPutHDFSRecord and adding handling of UNION types when writing Records as Avro
- Refactoring project structure to get CS API references out of nifi-commons, introducing nifi-extension-utils under nifi-nar-bundles
- Updating abstract put/fetch processors to obtain the WriteResult and update flow file attributes

This closes #1712.

Signed-off-by: Andy LoPresto <alopresto@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/60d88b5a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/60d88b5a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/60d88b5a

Branch: refs/heads/master
Commit: 60d88b5a64286776ffc2c9088905ac78a1a56786
Parents: 11b935a
Author: Bryan Bende <bbende@apache.org>
Authored: Wed Apr 12 18:25:31 2017 -0400
Committer: Andy LoPresto <alopresto@apache.org>
Committed: Mon May 1 16:10:35 2017 -0400

----------------------------------------------------------------------
 nifi-assembly/NOTICE                            |   8 +
 nifi-assembly/pom.xml                           |   5 +
 nifi-commons/nifi-hadoop-utils/pom.xml          |  60 --
 .../apache/nifi/hadoop/KerberosProperties.java  | 144 ----
 .../nifi/hadoop/KerberosTicketRenewer.java      |  91 ---
 .../org/apache/nifi/hadoop/SecurityUtil.java    | 116 ----
 .../nifi/hadoop/TestKerberosProperties.java     |  90 ---
 .../src/test/resources/krb5.conf                |  12 -
 nifi-commons/nifi-processor-utilities/pom.xml   |  67 --
 .../org/apache/nifi/processor/util/bin/Bin.java | 176 -----
 .../nifi/processor/util/bin/BinFiles.java       | 358 ----------
 .../nifi/processor/util/bin/BinManager.java     | 306 ---------
 .../AbstractListenEventBatchingProcessor.java   | 269 --------
 .../listen/AbstractListenEventProcessor.java    | 284 --------
 .../util/listen/ListenerProperties.java         |  87 ---
 .../dispatcher/AsyncChannelDispatcher.java      |  40 --
 .../listen/dispatcher/ChannelDispatcher.java    |  52 --
 .../dispatcher/DatagramChannelDispatcher.java   | 181 -----
 .../dispatcher/SocketChannelAttachment.java     |  44 --
 .../dispatcher/SocketChannelDispatcher.java     | 284 --------
 .../nifi/processor/util/listen/event/Event.java |  46 --
 .../util/listen/event/EventFactory.java         |  44 --
 .../util/listen/event/EventFactoryUtil.java     |  33 -
 .../processor/util/listen/event/EventQueue.java |  66 --
 .../util/listen/event/StandardEvent.java        |  52 --
 .../util/listen/event/StandardEventFactory.java |  37 -
 .../util/listen/handler/ChannelHandler.java     |  55 --
 .../listen/handler/ChannelHandlerFactory.java   |  46 --
 .../handler/socket/SSLSocketChannelHandler.java | 153 -----
 .../handler/socket/SocketChannelHandler.java    |  51 --
 .../socket/SocketChannelHandlerFactory.java     |  55 --
 .../socket/StandardSocketChannelHandler.java    | 158 -----
 .../util/listen/response/ChannelResponder.java  |  50 --
 .../util/listen/response/ChannelResponse.java   |  29 -
 .../socket/SSLSocketChannelResponder.java       |  44 --
 .../response/socket/SocketChannelResponder.java |  69 --
 .../util/pattern/DiscontinuedException.java     |  31 -
 .../nifi/processor/util/pattern/ErrorTypes.java | 148 ----
 .../util/pattern/ExceptionHandler.java          | 235 -------
 .../util/pattern/PartialFunctions.java          | 122 ----
 .../apache/nifi/processor/util/pattern/Put.java | 228 -------
 .../nifi/processor/util/pattern/PutGroup.java   |  97 ---
 .../util/pattern/RollbackOnFailure.java         | 226 -------
 .../processor/util/pattern/RoutingResult.java   |  50 --
 .../util/put/AbstractPutEventProcessor.java     | 575 ----------------
 .../util/put/sender/ChannelSender.java          | 109 ---
 .../util/put/sender/DatagramChannelSender.java  |  79 ---
 .../util/put/sender/SSLSocketChannelSender.java |  71 --
 .../util/put/sender/SocketChannelSender.java    |  98 ---
 .../util/pattern/TestExceptionHandler.java      | 202 ------
 .../util/pattern/TestRollbackOnFailure.java     | 144 ----
 nifi-commons/nifi-record/pom.xml                |  31 +
 .../apache/nifi/schema/access/SchemaField.java  |  37 +
 .../schema/access/SchemaNotFoundException.java  |  32 +
 .../serialization/MalformedRecordException.java |  31 +
 .../apache/nifi/serialization/RecordReader.java |  80 +++
 .../nifi/serialization/RecordSetWriter.java     |  45 ++
 .../apache/nifi/serialization/RecordWriter.java |  41 ++
 .../nifi/serialization/SimpleRecordSchema.java  | 193 ++++++
 .../apache/nifi/serialization/WriteResult.java  |  69 ++
 .../nifi/serialization/record/DataType.java     |  68 ++
 .../serialization/record/ListRecordSet.java     |  44 ++
 .../nifi/serialization/record/MapRecord.java    | 227 +++++++
 .../serialization/record/PushBackRecordSet.java |  67 ++
 .../nifi/serialization/record/Record.java       |  64 ++
 .../nifi/serialization/record/RecordField.java  | 101 +++
 .../serialization/record/RecordFieldType.java   | 337 ++++++++++
 .../nifi/serialization/record/RecordSchema.java |  79 +++
 .../nifi/serialization/record/RecordSet.java    |  91 +++
 .../record/ResultSetRecordSet.java              | 325 +++++++++
 .../serialization/record/SchemaIdentifier.java  |  51 ++
 .../record/StandardSchemaIdentifier.java        |  69 ++
 .../record/TypeMismatchException.java           |  28 +
 .../record/type/ArrayDataType.java              |  67 ++
 .../record/type/ChoiceDataType.java             |  68 ++
 .../serialization/record/type/MapDataType.java  |  67 ++
 .../record/type/RecordDataType.java             |  68 ++
 .../record/util/DataTypeUtils.java              | 670 +++++++++++++++++++
 .../util/IllegalTypeConversionException.java    |  29 +
 .../serialization/TestSimpleRecordSchema.java   |  79 +++
 .../serialization/record/TestMapRecord.java     | 188 ++++++
 nifi-commons/pom.xml                            |   3 +-
 .../nifi-hadoop-utils/pom.xml                   |  61 ++
 .../apache/nifi/hadoop/KerberosProperties.java  | 144 ++++
 .../nifi/hadoop/KerberosTicketRenewer.java      |  91 +++
 .../org/apache/nifi/hadoop/SecurityUtil.java    | 116 ++++
 .../hadoop/AbstractHadoopProcessor.java         | 521 ++++++++++++++
 .../nifi/processors/hadoop/CompressionType.java |  51 ++
 .../processors/hadoop/HadoopValidators.java     |  98 +++
 .../nifi/hadoop/TestKerberosProperties.java     |  90 +++
 .../src/test/resources/krb5.conf                |  12 +
 .../nifi-processor-utils/pom.xml                |  67 ++
 .../org/apache/nifi/processor/util/bin/Bin.java | 176 +++++
 .../nifi/processor/util/bin/BinFiles.java       | 358 ++++++++++
 .../nifi/processor/util/bin/BinManager.java     | 306 +++++++++
 .../AbstractListenEventBatchingProcessor.java   | 269 ++++++++
 .../listen/AbstractListenEventProcessor.java    | 284 ++++++++
 .../util/listen/ListenerProperties.java         |  87 +++
 .../dispatcher/AsyncChannelDispatcher.java      |  40 ++
 .../listen/dispatcher/ChannelDispatcher.java    |  52 ++
 .../dispatcher/DatagramChannelDispatcher.java   | 181 +++++
 .../dispatcher/SocketChannelAttachment.java     |  44 ++
 .../dispatcher/SocketChannelDispatcher.java     | 284 ++++++++
 .../nifi/processor/util/listen/event/Event.java |  46 ++
 .../util/listen/event/EventFactory.java         |  44 ++
 .../util/listen/event/EventFactoryUtil.java     |  33 +
 .../processor/util/listen/event/EventQueue.java |  66 ++
 .../util/listen/event/StandardEvent.java        |  52 ++
 .../util/listen/event/StandardEventFactory.java |  37 +
 .../util/listen/handler/ChannelHandler.java     |  55 ++
 .../listen/handler/ChannelHandlerFactory.java   |  46 ++
 .../handler/socket/SSLSocketChannelHandler.java | 153 +++++
 .../handler/socket/SocketChannelHandler.java    |  51 ++
 .../socket/SocketChannelHandlerFactory.java     |  55 ++
 .../socket/StandardSocketChannelHandler.java    | 158 +++++
 .../util/listen/response/ChannelResponder.java  |  50 ++
 .../util/listen/response/ChannelResponse.java   |  29 +
 .../socket/SSLSocketChannelResponder.java       |  44 ++
 .../response/socket/SocketChannelResponder.java |  69 ++
 .../util/pattern/DiscontinuedException.java     |  31 +
 .../nifi/processor/util/pattern/ErrorTypes.java | 148 ++++
 .../util/pattern/ExceptionHandler.java          | 235 +++++++
 .../util/pattern/PartialFunctions.java          | 122 ++++
 .../apache/nifi/processor/util/pattern/Put.java | 228 +++++++
 .../nifi/processor/util/pattern/PutGroup.java   |  97 +++
 .../util/pattern/RollbackOnFailure.java         | 226 +++++++
 .../processor/util/pattern/RoutingResult.java   |  50 ++
 .../util/put/AbstractPutEventProcessor.java     | 575 ++++++++++++++++
 .../util/put/sender/ChannelSender.java          | 109 +++
 .../util/put/sender/DatagramChannelSender.java  |  79 +++
 .../util/put/sender/SSLSocketChannelSender.java |  71 ++
 .../util/put/sender/SocketChannelSender.java    |  98 +++
 .../util/pattern/TestExceptionHandler.java      | 202 ++++++
 .../util/pattern/TestRollbackOnFailure.java     | 144 ++++
 .../nifi-avro-record-utils/pom.xml              |  45 ++
 .../apache/nifi/avro/AvroSchemaValidator.java   |  54 ++
 .../java/org/apache/nifi/avro/AvroTypeUtil.java | 504 ++++++++++++++
 .../schema/access/AvroSchemaTextStrategy.java   |  63 ++
 .../nifi/schema/access/SchemaAccessUtils.java   | 159 +++++
 .../nifi-hadoop-record-utils/pom.xml            |  56 ++
 .../hadoop/AbstractFetchHDFSRecord.java         | 279 ++++++++
 .../hadoop/AbstractPutHDFSRecord.java           | 541 +++++++++++++++
 .../hadoop/exception/FailureException.java      |  32 +
 .../exception/InvalidSchemaException.java       |  31 +
 .../exception/RecordReaderFactoryException.java |  31 +
 .../hadoop/record/HDFSRecordReader.java         |  31 +
 .../hadoop/record/HDFSRecordWriter.java         |  55 ++
 .../nifi-mock-record-utils/pom.xml              |  45 ++
 .../serialization/record/MockRecordParser.java  | 103 +++
 .../serialization/record/MockRecordWriter.java  | 123 ++++
 .../nifi-standard-record-utils/pom.xml          |  45 ++
 ...onworksAttributeSchemaReferenceStrategy.java | 115 ++++
 ...rtonworksAttributeSchemaReferenceWriter.java |  69 ++
 ...rtonworksEncodedSchemaReferenceStrategy.java |  76 +++
 ...HortonworksEncodedSchemaReferenceWriter.java |  78 +++
 .../schema/access/SchemaAccessStrategy.java     |  41 ++
 .../nifi/schema/access/SchemaAccessWriter.java  |  63 ++
 .../schema/access/SchemaNameAsAttribute.java    |  62 ++
 .../access/SchemaNamePropertyStrategy.java      |  68 ++
 .../schema/access/SchemaTextAsAttribute.java    |  60 ++
 .../nifi/serialization/DateTimeUtils.java       |  50 ++
 .../SimpleDateFormatValidator.java              |  48 ++
 .../nifi-record-utils/pom.xml                   |  33 +
 nifi-nar-bundles/nifi-extension-utils/pom.xml   |  35 +
 .../nifi-hdfs-processors/pom.xml                |  14 +-
 .../hadoop/AbstractHadoopProcessor.java         | 580 ----------------
 .../apache/nifi/processors/hadoop/PutHDFS.java  |  56 +-
 .../hadoop/TestCreateHadoopSequenceFile.java    |   6 +-
 .../nifi-hive-processors/pom.xml                |   6 -
 .../nifi-parquet-nar/pom.xml                    |  46 ++
 .../src/main/resources/META-INF/LICENSE         | 239 +++++++
 .../src/main/resources/META-INF/NOTICE          | 105 +++
 .../nifi-parquet-processors/pom.xml             | 100 +++
 .../nifi/processors/parquet/FetchParquet.java   |  63 ++
 .../nifi/processors/parquet/PutParquet.java     | 278 ++++++++
 .../record/AvroParquetHDFSRecordReader.java     |  72 ++
 .../record/AvroParquetHDFSRecordWriter.java     |  52 ++
 .../org.apache.nifi.processor.Processor         |  16 +
 .../processors/parquet/FetchParquetTest.java    | 282 ++++++++
 .../nifi/processors/parquet/PutParquetTest.java | 669 ++++++++++++++++++
 .../src/test/resources/avro/user.avsc           |   9 +
 .../src/test/resources/core-site.xml            |  25 +
 nifi-nar-bundles/nifi-parquet-bundle/pom.xml    |  35 +
 .../nifi-registry-service/pom.xml               |   4 +
 .../nifi-scripting-processors/pom.xml           |   4 +
 .../nifi-standard-processors/pom.xml            |   9 +
 .../standard/TestPutDatabaseRecord.groovy       |   2 +-
 .../processors/standard/TestConvertRecord.java  |   8 +-
 .../processors/standard/TestQueryRecord.java    |  20 +-
 .../processors/standard/TestSplitRecord.java    |  14 +-
 .../standard/util/record/MockRecordParser.java  | 103 ---
 .../standard/util/record/MockRecordWriter.java  | 124 ----
 .../nifi-hbase_1_1_2-client-service/pom.xml     |   6 -
 .../nifi-hwx-schema-registry-service/pom.xml    |   6 +-
 .../pom.xml                                     |   5 +
 .../apache/nifi/schema/access/SchemaField.java  |  37 -
 .../schema/access/SchemaNotFoundException.java  |  32 -
 .../nifi/serialization/DataTypeValidator.java   |  82 ---
 .../serialization/MalformedRecordException.java |  31 -
 .../apache/nifi/serialization/RecordReader.java |  80 ---
 .../nifi/serialization/RecordSetWriter.java     |  45 --
 .../apache/nifi/serialization/RecordWriter.java |  41 --
 .../nifi/serialization/SimpleRecordSchema.java  | 193 ------
 .../apache/nifi/serialization/WriteResult.java  |  69 --
 .../nifi/serialization/record/DataType.java     |  68 --
 .../serialization/record/ListRecordSet.java     |  44 --
 .../nifi/serialization/record/MapRecord.java    | 227 -------
 .../serialization/record/PushBackRecordSet.java |  67 --
 .../nifi/serialization/record/Record.java       |  64 --
 .../nifi/serialization/record/RecordField.java  | 101 ---
 .../serialization/record/RecordFieldType.java   | 337 ----------
 .../nifi/serialization/record/RecordSchema.java |  79 ---
 .../nifi/serialization/record/RecordSet.java    |  91 ---
 .../record/ResultSetRecordSet.java              | 325 ---------
 .../serialization/record/SchemaIdentifier.java  |  51 --
 .../record/StandardSchemaIdentifier.java        |  69 --
 .../record/TypeMismatchException.java           |  28 -
 .../record/type/ArrayDataType.java              |  67 --
 .../record/type/ChoiceDataType.java             |  68 --
 .../serialization/record/type/MapDataType.java  |  67 --
 .../record/type/RecordDataType.java             |  68 --
 .../record/util/DataTypeUtils.java              | 670 -------------------
 .../util/IllegalTypeConversionException.java    |  29 -
 .../serialization/TestSimpleRecordSchema.java   |  79 ---
 .../serialization/record/TestMapRecord.java     | 188 ------
 .../nifi-record-serialization-services/pom.xml  |   8 +
 .../java/org/apache/nifi/avro/AvroReader.java   |   3 +-
 .../org/apache/nifi/avro/AvroRecordReader.java  | 167 +----
 .../apache/nifi/avro/AvroRecordSetWriter.java   |   3 +-
 .../apache/nifi/avro/AvroSchemaValidator.java   |  54 --
 .../java/org/apache/nifi/avro/AvroTypeUtil.java | 187 ------
 .../org/apache/nifi/avro/WriteAvroResult.java   | 163 +----
 .../avro/WriteAvroResultWithExternalSchema.java |   2 +-
 .../nifi/avro/WriteAvroResultWithSchema.java    |   2 +-
 .../nifi/csv/CSVHeaderSchemaStrategy.java       |   8 +-
 .../java/org/apache/nifi/csv/CSVReader.java     |   9 +-
 .../main/java/org/apache/nifi/csv/CSVUtils.java |   3 +-
 .../java/org/apache/nifi/grok/GrokReader.java   |   6 +-
 .../schema/access/AvroSchemaTextStrategy.java   |  64 --
 ...onworksAttributeSchemaReferenceStrategy.java | 116 ----
 ...rtonworksAttributeSchemaReferenceWriter.java |  69 --
 ...rtonworksEncodedSchemaReferenceStrategy.java |  77 ---
 ...HortonworksEncodedSchemaReferenceWriter.java |  78 ---
 .../schema/access/SchemaAccessStrategy.java     |  43 --
 .../nifi/schema/access/SchemaAccessWriter.java  |  63 --
 .../schema/access/SchemaNameAsAttribute.java    |  62 --
 .../access/SchemaNamePropertyStrategy.java      |  69 --
 .../schema/access/SchemaTextAsAttribute.java    |  60 --
 .../nifi/serialization/DateTimeUtils.java       |  50 --
 .../serialization/SchemaRegistryService.java    | 139 +---
 .../SimpleDateFormatValidator.java              |  48 --
 .../apache/nifi/text/FreeFormTextWriter.java    |   6 +-
 .../nifi/csv/TestCSVHeaderSchemaStrategy.java   |   4 +-
 .../nifi-schema-registry-service-api/pom.xml    |   4 +
 .../nifi-standard-services-api-nar/pom.xml      |   5 +
 nifi-nar-bundles/pom.xml                        |  22 +-
 pom.xml                                         |  31 +
 257 files changed, 15268 insertions(+), 11868 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-assembly/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index 38476f3..6125d94 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -1236,6 +1236,14 @@ and can be found in the org.apache.hadoop.hive.ql.io.orc package
 
          https://github.com/triplecheck/TLSH
 
+  (ASLv2) Apache Parquet
+    The following NOTICE information applies:
+      Apache Parquet MR (Incubating)
+      Copyright 2014 The Apache Software Foundation
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
+
   (ASLv2) Hortonworks Schema Registry
     The following NOTICE information applies:
       Hortonworks Schema Registry

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index a009acd..a83fc52 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -481,6 +481,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-parquet-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-hwx-schema-registry-nar</artifactId>
             <type>nar</type>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-hadoop-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-hadoop-utils/pom.xml b/nifi-commons/nifi-hadoop-utils/pom.xml
deleted file mode 100644
index 198b4be..0000000
--- a/nifi-commons/nifi-hadoop-utils/pom.xml
+++ /dev/null
@@ -1,60 +0,0 @@
-<?xml version="1.0"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-commons</artifactId>
-        <version>1.2.0-SNAPSHOT</version>
-    </parent>
-    <artifactId>nifi-hadoop-utils</artifactId>
-    <version>1.2.0-SNAPSHOT</version>
-    <packaging>jar</packaging>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-api</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-utils</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-lang3</artifactId>
-        </dependency>        
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.rat</groupId>
-                <artifactId>apache-rat-plugin</artifactId>
-                <configuration>
-                    <excludes combine.children="append">
-                        <exclude>src/test/resources/krb5.conf</exclude>
-                    </excludes>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
deleted file mode 100644
index af10079..0000000
--- a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.hadoop;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * All processors and controller services that need properties for Kerberos
- * Principal and Keytab should obtain them through this class by calling:
- *
- * KerberosProperties props =
- * KerberosProperties.create(NiFiProperties.getInstance())
- *
- * The properties can be accessed from the resulting KerberosProperties
- * instance.
- */
-public class KerberosProperties {
-
-    private final File kerberosConfigFile;
-    private final Validator kerberosConfigValidator;
-    private final PropertyDescriptor kerberosPrincipal;
-    private final PropertyDescriptor kerberosKeytab;
-
-    /**
-     * Instantiate a KerberosProperties object but keep in mind it is
-     * effectively a singleton because the krb5.conf file needs to be set as a
-     * system property which this constructor will take care of.
-     *
-     * @param kerberosConfigFile file of krb5.conf
-     */
-    public KerberosProperties(final File kerberosConfigFile) {
-        this.kerberosConfigFile = kerberosConfigFile;
-
-        this.kerberosConfigValidator = new Validator() {
-            @Override
-            public ValidationResult validate(String subject, String input, ValidationContext context) {
-                // Check that the Kerberos configuration is set
-                if (kerberosConfigFile == null) {
-                    return new ValidationResult.Builder()
-                            .subject(subject).input(input).valid(false)
-                            .explanation("you are missing the nifi.kerberos.krb5.file property which "
-                                    + "must be set in order to use Kerberos")
-                            .build();
-                }
-
-                // Check that the Kerberos configuration is readable
-                if (!kerberosConfigFile.canRead()) {
-                    return new ValidationResult.Builder().subject(subject).input(input).valid(false)
-                            .explanation(String.format("unable to read Kerberos config [%s], please make sure the path is valid "
-                                    + "and nifi has adequate permissions", kerberosConfigFile.getAbsoluteFile()))
-                            .build();
-                }
-
-                return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
-            }
-        };
-
-        this.kerberosPrincipal = new PropertyDescriptor.Builder()
-                .name("Kerberos Principal")
-                .required(false)
-                .description("Kerberos principal to authenticate as. Requires nifi.kerberos.krb5.file to be set in your nifi.properties")
-                .addValidator(kerberosConfigValidator)
-                .build();
-
-        this.kerberosKeytab = new PropertyDescriptor.Builder()
-                .name("Kerberos Keytab").required(false)
-                .description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set in your nifi.properties")
-                .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
-                .addValidator(kerberosConfigValidator)
-                .build();
-    }
-
-    public File getKerberosConfigFile() {
-        return kerberosConfigFile;
-    }
-
-    public Validator getKerberosConfigValidator() {
-        return kerberosConfigValidator;
-    }
-
-    public PropertyDescriptor getKerberosPrincipal() {
-        return kerberosPrincipal;
-    }
-
-    public PropertyDescriptor getKerberosKeytab() {
-        return kerberosKeytab;
-    }
-
-    public static List<ValidationResult> validatePrincipalAndKeytab(final String subject, final Configuration config, final String principal, final String keytab, final ComponentLog logger) {
-        final List<ValidationResult> results = new ArrayList<>();
-
-        // if security is enabled then the keytab and principal are required
-        final boolean isSecurityEnabled = SecurityUtil.isSecurityEnabled(config);
-
-        final boolean blankPrincipal = (principal == null || principal.isEmpty());
-        if (isSecurityEnabled && blankPrincipal) {
-            results.add(new ValidationResult.Builder()
-                    .valid(false)
-                    .subject(subject)
-                    .explanation("Kerberos Principal must be provided when using a secure configuration")
-                    .build());
-        }
-
-        final boolean blankKeytab = (keytab == null || keytab.isEmpty());
-        if (isSecurityEnabled && blankKeytab) {
-            results.add(new ValidationResult.Builder()
-                    .valid(false)
-                    .subject(subject)
-                    .explanation("Kerberos Keytab must be provided when using a secure configuration")
-                    .build());
-        }
-
-        if (!isSecurityEnabled && (!blankPrincipal || !blankKeytab)) {
-            logger.warn("Configuration does not have security enabled, Keytab and Principal will be ignored");
-        }
-
-        return results;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
deleted file mode 100644
index bf922fe..0000000
--- a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.hadoop;
-
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.nifi.logging.ComponentLog;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-
-/**
- * Periodically attempts to renew the Kerberos user's ticket for the given UGI.
- *
- * This class will attempt to call ugi.checkTGTAndReloginFromKeytab() which
- * will re-login the user if the ticket expired or is close to expiry. Between
- * relogin attempts this thread will sleep for the provided amount of time.
- *
- */
-public class KerberosTicketRenewer implements Runnable {
-
-    private final UserGroupInformation ugi;
-    private final long renewalPeriod;
-    private final ComponentLog logger;
-
-    private volatile boolean stopped = false;
-
-    /**
-     * @param ugi
-     *          the user to renew the ticket for
-     * @param renewalPeriod
-     *          the amount of time in milliseconds to wait between renewal attempts
-     * @param logger
-     *          the logger from the component that started the renewer
-     */
-    public KerberosTicketRenewer(final UserGroupInformation ugi, final long renewalPeriod, final ComponentLog logger) {
-        this.ugi = ugi;
-        this.renewalPeriod = renewalPeriod;
-        this.logger = logger;
-    }
-
-    @Override
-    public void run() {
-        stopped = false;
-        while (!stopped) {
-            try {
-                logger.debug("Invoking renewal attempt for Kerberos ticket");
-                // While we run this "frequently", the Hadoop implementation will only perform the login at 80% of ticket lifetime.
-                ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
-                    ugi.checkTGTAndReloginFromKeytab();
-                    return null;
-                });
-            } catch (IOException e) {
-                logger.error("Failed to renew Kerberos ticket", e);
-            } catch (InterruptedException e) {
-                logger.error("Interrupted while attempting to renew Kerberos ticket", e);
-                Thread.currentThread().interrupt();
-                return;
-            }
-
-            logger.debug("current UGI {}", new Object[]{ugi});
-
-            // Wait for a bit before checking again.
-            try {
-                Thread.sleep(renewalPeriod);
-            } catch (InterruptedException e) {
-                logger.error("Renewal thread interrupted", e);
-                Thread.currentThread().interrupt();
-                return;
-            }
-        }
-    }
-
-    public void stop() {
-        stopped = true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
deleted file mode 100644
index fcb9032..0000000
--- a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.hadoop;
-
-import org.apache.commons.lang3.Validate;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.nifi.logging.ComponentLog;
-
-import java.io.IOException;
-
-/**
- * Provides synchronized access to UserGroupInformation to avoid multiple processors/services from
- * interfering with each other.
- */
-public class SecurityUtil {
-    public static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
-    public static final String KERBEROS = "kerberos";
-
-    /**
-     * Initializes UserGroupInformation with the given Configuration and performs the login for the given principal
-     * and keytab. All logins should happen through this class to ensure other threads are not concurrently modifying
-     * UserGroupInformation.
-     *
-     * @param config the configuration instance
-     * @param principal the principal to authenticate as
-     * @param keyTab the keytab to authenticate with
-     *
-     * @return the UGI for the given principal
-     *
-     * @throws IOException if login failed
-     */
-    public static synchronized UserGroupInformation loginKerberos(final Configuration config, final String principal, final String keyTab)
-            throws IOException {
-        Validate.notNull(config);
-        Validate.notNull(principal);
-        Validate.notNull(keyTab);
-
-        UserGroupInformation.setConfiguration(config);
-        return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal.trim(), keyTab.trim());
-    }
-
-    /**
-     * Initializes UserGroupInformation with the given Configuration and returns UserGroupInformation.getLoginUser().
-     * All logins should happen through this class to ensure other threads are not concurrently modifying
-     * UserGroupInformation.
-     *
-     * @param config the configuration instance
-     *
-     * @return the UGI for the given principal
-     *
-     * @throws IOException if login failed
-     */
-    public static synchronized UserGroupInformation loginSimple(final Configuration config) throws IOException {
-        Validate.notNull(config);
-        UserGroupInformation.setConfiguration(config);
-        return UserGroupInformation.getLoginUser();
-    }
-
-    /**
-     * Initializes UserGroupInformation with the given Configuration and returns UserGroupInformation.isSecurityEnabled().
-     *
-     * All checks for isSecurityEnabled() should happen through this method.
-     *
-     * @param config the given configuration
-     *
-     * @return true if kerberos is enabled on the given configuration, false otherwise
-     *
-     */
-    public static boolean isSecurityEnabled(final Configuration config) {
-        Validate.notNull(config);
-        return KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION));
-    }
-
-    /**
-     * Start a thread that periodically attempts to renew the current Kerberos user's ticket.
-     *
-     * Callers of this method should store the reference to the KerberosTicketRenewer and call stop() to stop the thread.
-     *
-     * @param id
-     *          The unique identifier to use for the thread, can be the class name that started the thread
-     *              (i.e. PutHDFS, etc)
-     * @param ugi
-     *          The current Kerberos user.
-     * @param renewalPeriod
-     *          The amount of time between attempting renewals.
-     * @param logger
-     *          The logger to use with in the renewer
-     *
-     * @return the KerberosTicketRenewer Runnable
-     */
-    public static KerberosTicketRenewer startTicketRenewalThread(final String id, final UserGroupInformation ugi, final long renewalPeriod, final ComponentLog logger) {
-        final KerberosTicketRenewer renewer = new KerberosTicketRenewer(ugi, renewalPeriod, logger);
-
-        final Thread t = new Thread(renewer);
-        t.setName("Kerberos Ticket Renewal [" + id + "]");
-        t.start();
-
-        return renewer;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java b/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java
deleted file mode 100644
index 8cd1ea1..0000000
--- a/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.hadoop;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.logging.ComponentLog;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.File;
-import java.util.List;
-
-public class TestKerberosProperties {
-
-    @Test
-    public void testWithKerberosConfigFile() {
-        final File file = new File("src/test/resources/krb5.conf");
-
-        final KerberosProperties kerberosProperties = new KerberosProperties(file);
-        Assert.assertNotNull(kerberosProperties);
-
-        Assert.assertNotNull(kerberosProperties.getKerberosConfigFile());
-        Assert.assertNotNull(kerberosProperties.getKerberosConfigValidator());
-        Assert.assertNotNull(kerberosProperties.getKerberosPrincipal());
-        Assert.assertNotNull(kerberosProperties.getKerberosKeytab());
-
-        final ValidationResult result = kerberosProperties.getKerberosConfigValidator().validate("test", "principal", null);
-        Assert.assertTrue(result.isValid());
-    }
-
-    @Test
-    public void testWithoutKerberosConfigFile() {
-        final File file = new File("src/test/resources/krb5.conf");
-
-        final KerberosProperties kerberosProperties = new KerberosProperties(null);
-        Assert.assertNotNull(kerberosProperties);
-
-        Assert.assertNull(kerberosProperties.getKerberosConfigFile());
-        Assert.assertNotNull(kerberosProperties.getKerberosConfigValidator());
-        Assert.assertNotNull(kerberosProperties.getKerberosPrincipal());
-        Assert.assertNotNull(kerberosProperties.getKerberosKeytab());
-
-        final ValidationResult result = kerberosProperties.getKerberosConfigValidator().validate("test", "principal", null);
-        Assert.assertFalse(result.isValid());
-    }
-
-    @Test
-    public void testValidatePrincipalAndKeytab() {
-        final ComponentLog log = Mockito.mock(ComponentLog.class);
-        final Configuration config = new Configuration();
-
-        // no security enabled in config so doesn't matter what principal and keytab are
-        List<ValidationResult> results = KerberosProperties.validatePrincipalAndKeytab(
-                "test", config, null, null, log);
-        Assert.assertEquals(0, results.size());
-
-        results = KerberosProperties.validatePrincipalAndKeytab(
-                "test", config, "principal", null, log);
-        Assert.assertEquals(0, results.size());
-
-        results = KerberosProperties.validatePrincipalAndKeytab(
-                "test", config, "principal", "keytab", log);
-        Assert.assertEquals(0, results.size());
-
-        // change the config to have kerberos turned on
-        config.set("hadoop.security.authentication", "kerberos");
-        config.set("hadoop.security.authorization", "true");
-
-        results = KerberosProperties.validatePrincipalAndKeytab(
-                "test", config, null, null, log);
-        Assert.assertEquals(2, results.size());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf b/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf
deleted file mode 100644
index 814d5b2..0000000
--- a/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf
+++ /dev/null
@@ -1,12 +0,0 @@
-[libdefaults]
-  default_realm = EXAMPLE.COM
-
-[realms]
-  EXAMPLE.COM = {
-    kdc = kdc1.example.com
-    kdc = kdc2.example.com
-    admin_server = kdc1.example.com
-  }
-
-[domain_realm]
-  .example.com = EXAMPLE.COM
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/pom.xml b/nifi-commons/nifi-processor-utilities/pom.xml
deleted file mode 100644
index ce5ae0b..0000000
--- a/nifi-commons/nifi-processor-utilities/pom.xml
+++ /dev/null
@@ -1,67 +0,0 @@
-<?xml version="1.0"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-commons</artifactId>
-        <version>1.2.0-SNAPSHOT</version>
-    </parent>
-    <artifactId>nifi-processor-utils</artifactId>
-    <packaging>jar</packaging>
-    <description>
-        This nifi-processor-utils module is designed to capture common patterns
-        and utilities that can be leveraged by other processors or components to
-        help promote reuse.  These patterns may become framework level features 
-        or may simply be made available through this utility.  It is ok for this
-        module to have dependencies but care should be taken when adding dependencies
-        as this increases the cost of utilizing this module in various nars.
-    </description>
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-api</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-utils</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-security-utils</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-io</groupId>
-            <artifactId>commons-io</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-ssl-context-service-api</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-mock</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/Bin.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
deleted file mode 100644
index fdbc71f..0000000
--- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.bin;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-
-/**
- * Note: {@code Bin} objects are NOT thread safe. If multiple threads access a {@code Bin}, the caller must synchronize
- * access.
- */
-public class Bin {
-    private final ProcessSession session;
-    private final long creationMomentEpochNs;
-    private final long minimumSizeBytes;
-    private final long maximumSizeBytes;
-
-    private volatile int minimumEntries = 0;
-    private volatile int maximumEntries = Integer.MAX_VALUE;
-    private final String fileCountAttribute;
-
-    final List<FlowFile> binContents = new ArrayList<>();
-    long size;
-    int successiveFailedOfferings = 0;
-
-    /**
-     * Constructs a new bin
-     *
-     * @param session the session
-     * @param minSizeBytes min bytes
-     * @param maxSizeBytes max bytes
-     * @param minEntries min entries
-     * @param maxEntries max entries
-     * @param fileCountAttribute num files
-     * @throws IllegalArgumentException if the min is not less than or equal to the max.
-     */
-    public Bin(final ProcessSession session, final long minSizeBytes, final long maxSizeBytes, final int minEntries, final int maxEntries, final String fileCountAttribute) {
-        this.session = session;
-        this.minimumSizeBytes = minSizeBytes;
-        this.maximumSizeBytes = maxSizeBytes;
-        this.minimumEntries = minEntries;
-        this.maximumEntries = maxEntries;
-        this.fileCountAttribute = fileCountAttribute;
-
-        this.creationMomentEpochNs = System.nanoTime();
-        if (minSizeBytes > maxSizeBytes) {
-            throw new IllegalArgumentException();
-        }
-    }
-
-    public ProcessSession getSession() {
-        return session;
-    }
-
-    /**
-     * Indicates whether the bin has enough items to be considered full. This is based on whether the current size of the bin is greater than the minimum size in bytes and based on having a number of
-     * successive unsuccessful attempts to add a new item (because it is so close to the max or the size of the objects being attempted do not favor tight packing)
-     *
-     * @return true if considered full; false otherwise
-     */
-    public boolean isFull() {
-        return (((size >= minimumSizeBytes) && binContents.size() >= minimumEntries) && (successiveFailedOfferings > 5))
-                || (size >= maximumSizeBytes) || (binContents.size() >= maximumEntries);
-    }
-
-    /**
-     * Indicates enough size exists to meet the minimum requirements
-     *
-     * @return true if full enough
-     */
-    public boolean isFullEnough() {
-        return isFull() || (size >= minimumSizeBytes && (binContents.size() >= minimumEntries));
-    }
-
-    /**
-     * Determines if this bin is older than the time specified.
-     *
-     * @param duration duration
-     * @param unit unit
-     * @return true if this bin is older than the length of time given; false otherwise
-     */
-    public boolean isOlderThan(final int duration, final TimeUnit unit) {
-        final long ageInNanos = System.nanoTime() - creationMomentEpochNs;
-        return ageInNanos > TimeUnit.NANOSECONDS.convert(duration, unit);
-    }
-
-    /**
-     * Determines if this bin is older than the specified bin
-     *
-     * @param other other bin
-     * @return true if this is older than given bin
-     */
-    public boolean isOlderThan(final Bin other) {
-        return creationMomentEpochNs < other.creationMomentEpochNs;
-    }
-
-    /**
-     * If this bin has enough room for the size of the given flow file then it is added otherwise it is not
-     *
-     * @param flowFile flowfile to offer
-     * @param session the ProcessSession to which the FlowFile belongs
-     * @return true if added; false otherwise
-     */
-    public boolean offer(final FlowFile flowFile, final ProcessSession session) {
-        if (((size + flowFile.getSize()) > maximumSizeBytes) || (binContents.size() >= maximumEntries)) {
-            successiveFailedOfferings++;
-            return false;
-        }
-
-        if (fileCountAttribute != null) {
-            final String countValue = flowFile.getAttribute(fileCountAttribute);
-            final Integer count = toInteger(countValue);
-            if (count != null) {
-                int currentMaxEntries = this.maximumEntries;
-                this.maximumEntries = Math.min(count, currentMaxEntries);
-                this.minimumEntries = currentMaxEntries;
-            }
-        }
-
-        size += flowFile.getSize();
-
-        session.migrate(getSession(), Collections.singleton(flowFile));
-        binContents.add(flowFile);
-        successiveFailedOfferings = 0;
-        return true;
-    }
-
-    private static final Pattern intPattern = Pattern.compile("\\d+");
-
-    public Integer toInteger(final String value) {
-        if (value == null) {
-            return null;
-        }
-        if (!intPattern.matcher(value).matches()) {
-            return null;
-        }
-
-        try {
-            return Integer.parseInt(value);
-        } catch (final Exception e) {
-            return null;
-        }
-    }
-
-    /**
-     * @return the underlying list of flow files within this bin
-     */
-    public List<FlowFile> getContents() {
-        return binContents;
-    }
-
-    public long getBinAge() {
-        final long ageInNanos = System.nanoTime() - creationMomentEpochNs;
-        return TimeUnit.MILLISECONDS.convert(ageInNanos, TimeUnit.NANOSECONDS);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
deleted file mode 100644
index 67e37c2..0000000
--- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.bin;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-
-/**
- * Base class for file-binning processors.
- *
- */
-public abstract class BinFiles extends AbstractSessionFactoryProcessor {
-
-    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
-            .name("Minimum Group Size")
-            .description("The minimum size of for the bundle")
-            .required(true)
-            .defaultValue("0 B")
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
-            .name("Maximum Group Size")
-            .description("The maximum size for the bundle. If not specified, there is no maximum.")
-            .required(false)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor MIN_ENTRIES = new PropertyDescriptor.Builder()
-            .name("Minimum Number of Entries")
-            .description("The minimum number of files to include in a bundle")
-            .required(true)
-            .defaultValue("1")
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder()
-            .name("Maximum Number of Entries")
-            .description("The maximum number of files to include in a bundle. If not specified, there is no maximum.")
-            .defaultValue("1000")
-            .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder()
-            .name("Maximum number of Bins")
-            .description("Specifies the maximum number of bins that can be held in memory at any one time")
-            .defaultValue("5")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder()
-            .name("Max Bin Age")
-            .description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> "
-                    + "where <duration> is a positive integer and time unit is one of seconds, minutes, hours")
-            .required(false)
-            .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
-            .build();
-
-    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
-            .name("original")
-            .description("The FlowFiles that were used to create the bundle")
-            .build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure")
-            .build();
-
-    private final BinManager binManager = new BinManager();
-    private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
-
-    @OnStopped
-    public final void resetState() {
-        binManager.purge();
-
-        Bin bin;
-        while ((bin = readyBins.poll()) != null) {
-            bin.getSession().rollback();
-        }
-    }
-
-    /**
-     * Allows general pre-processing of a flow file before it is offered to a bin. This is called before getGroupId().
-     *
-     * @param context context
-     * @param session session
-     * @param flowFile flowFile
-     * @return The flow file, possibly altered
-     */
-    protected abstract FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile);
-
-    /**
-     * Returns a group ID representing a bin. This allows flow files to be binned into like groups.
-     *
-     * @param context context
-     * @param flowFile flowFile
-     * @return The appropriate group ID
-     */
-    protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile);
-
-    /**
-     * Performs any additional setup of the bin manager. Called during the OnScheduled phase.
-     *
-     * @param binManager The bin manager
-     * @param context context
-     */
-    protected abstract void setUpBinManager(BinManager binManager, ProcessContext context);
-
-    /**
-     * Processes a single bin. Implementing class is responsible for committing each session
-     *
-     * @param unmodifiableBin A reference to a single bin of flow files
-     * @param context The context
-     * @return <code>true</code> if the input bin was already committed. E.g., in case of a failure, the implementation
-     *         may choose to transfer all binned files to Failure and commit their sessions. If
-     *         false, the processBins() method will transfer the files to Original and commit the sessions
-     *
-     * @throws ProcessException if any problem arises while processing a bin of FlowFiles. All flow files in the bin
-     *             will be transferred to failure and the ProcessSession provided by the 'session'
-     *             argument rolled back
-     */
-    protected abstract boolean processBin(Bin unmodifiableBin, ProcessContext context) throws ProcessException;
-
-    /**
-     * Allows additional custom validation to be done. This will be called from the parent's customValidation method.
-     *
-     * @param context The context
-     * @return Validation results indicating problems
-     */
-    protected Collection<ValidationResult> additionalCustomValidation(final ValidationContext context) {
-        return new ArrayList<>();
-    }
-
-    @Override
-    public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
-        final int totalBinCount = binManager.getBinCount() + readyBins.size();
-        final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
-        final int flowFilesBinned;
-
-        if (totalBinCount < maxBinCount) {
-            flowFilesBinned = binFlowFiles(context, sessionFactory);
-            getLogger().debug("Binned {} FlowFiles", new Object[] {flowFilesBinned});
-        } else {
-            flowFilesBinned = 0;
-            getLogger().debug("Will not bin any FlowFiles because {} bins already exist;"
-                + "will wait until bins have been emptied before any more are created", new Object[] {totalBinCount});
-        }
-
-        if (!isScheduled()) {
-            return;
-        }
-
-        final int binsMigrated = migrateBins(context);
-        final int binsProcessed = processBins(context);
-        //If we accomplished nothing then let's yield
-        if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) {
-            context.yield();
-        }
-    }
-
-    private int migrateBins(final ProcessContext context) {
-        int added = 0;
-        for (final Bin bin : binManager.removeReadyBins(true)) {
-            this.readyBins.add(bin);
-            added++;
-        }
-
-        // if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
-        // this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
-        // bins. So we may as well expire it now.
-        if (added == 0 && (readyBins.size() + binManager.getBinCount()) >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
-            final Bin bin = binManager.removeOldestBin();
-            if (bin != null) {
-                added++;
-                this.readyBins.add(bin);
-            }
-        }
-        return added;
-    }
-
-    private int processBins(final ProcessContext context) {
-        final Bin bin = readyBins.poll();
-        if (bin == null) {
-            return 0;
-        }
-
-        final List<Bin> bins = new ArrayList<>();
-        bins.add(bin);
-
-        final ComponentLog logger = getLogger();
-
-        boolean binAlreadyCommitted = false;
-        try {
-            binAlreadyCommitted = this.processBin(bin, context);
-        } catch (final ProcessException e) {
-            logger.error("Failed to process bundle of {} files due to {}", new Object[] {bin.getContents().size(), e});
-
-            final ProcessSession binSession = bin.getSession();
-            for (final FlowFile flowFile : bin.getContents()) {
-                binSession.transfer(flowFile, REL_FAILURE);
-            }
-            binSession.commit();
-            return 1;
-        } catch (final Exception e) {
-            logger.error("Failed to process bundle of {} files due to {}; rolling back sessions", new Object[] {bin.getContents().size(), e});
-
-            bin.getSession().rollback();
-            return 1;
-        }
-
-        // If this bin's session has been committed, move on.
-        if (!binAlreadyCommitted) {
-            final ProcessSession binSession = bin.getSession();
-            binSession.transfer(bin.getContents(), REL_ORIGINAL);
-            binSession.commit();
-        }
-
-        return 1;
-    }
-
-    private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
-        int flowFilesBinned = 0;
-        while (binManager.getBinCount() <= context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
-            if (!isScheduled()) {
-                break;
-            }
-
-            final ProcessSession session = sessionFactory.createSession();
-            final List<FlowFile> flowFiles = session.get(1000);
-            if (flowFiles.isEmpty()) {
-                break;
-            }
-
-            final Map<String, List<FlowFile>> flowFileGroups = new HashMap<>();
-            for (FlowFile flowFile : flowFiles) {
-                flowFile = this.preprocessFlowFile(context, session, flowFile);
-                final String groupingIdentifier = getGroupId(context, flowFile);
-                flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile);
-            }
-
-            for (final Map.Entry<String, List<FlowFile>> entry : flowFileGroups.entrySet()) {
-                final Set<FlowFile> unbinned = binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory);
-                for (final FlowFile flowFile : unbinned) {
-                    Bin bin = new Bin(sessionFactory.createSession(), 0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
-                    bin.offer(flowFile, session);
-                    this.readyBins.add(bin);
-                }
-            }
-        }
-
-        return flowFilesBinned;
-    }
-
-    @OnScheduled
-    public final void onScheduled(final ProcessContext context) throws IOException {
-        binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
-
-        if (context.getProperty(MAX_BIN_AGE).isSet()) {
-            binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
-        } else {
-            binManager.setMaxBinAge(Integer.MAX_VALUE);
-        }
-
-        if (context.getProperty(MAX_SIZE).isSet()) {
-            binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
-        } else {
-            binManager.setMaximumSize(Long.MAX_VALUE);
-        }
-
-        binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
-
-        if (context.getProperty(MAX_ENTRIES).isSet()) {
-            binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
-        } else {
-            binManager.setMaximumEntries(Integer.MAX_VALUE);
-        }
-
-        this.setUpBinManager(binManager, context);
-    }
-
-    @Override
-    protected final Collection<ValidationResult> customValidate(final ValidationContext context) {
-        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
-
-        final long minBytes = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
-        final Double maxBytes = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
-
-        if (maxBytes != null && maxBytes.longValue() < minBytes) {
-            problems.add(
-                    new ValidationResult.Builder()
-                    .subject(MIN_SIZE.getName())
-                    .input(context.getProperty(MIN_SIZE).getValue())
-                    .valid(false)
-                    .explanation("Min Size must be less than or equal to Max Size")
-                    .build()
-            );
-        }
-
-        final Long min = context.getProperty(MIN_ENTRIES).asLong();
-        final Long max = context.getProperty(MAX_ENTRIES).asLong();
-
-        if (min != null && max != null) {
-            if (min > max) {
-                problems.add(
-                        new ValidationResult.Builder().subject(MIN_ENTRIES.getName())
-                        .input(context.getProperty(MIN_ENTRIES).getValue())
-                        .valid(false)
-                        .explanation("Min Entries must be less than or equal to Max Entries")
-                        .build()
-                );
-            }
-        }
-
-        Collection<ValidationResult> otherProblems = this.additionalCustomValidation(context);
-        if (otherProblems != null) {
-            problems.addAll(otherProblems);
-        }
-
-        return problems;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
deleted file mode 100644
index d6a8567..0000000
--- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.bin;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-
-/**
- * This class is thread safe
- *
- */
-public class BinManager {
-
-    private final AtomicLong minSizeBytes = new AtomicLong(0L);
-    private final AtomicLong maxSizeBytes = new AtomicLong(Long.MAX_VALUE);
-    private final AtomicInteger minEntries = new AtomicInteger(0);
-    private final AtomicInteger maxEntries = new AtomicInteger(Integer.MAX_VALUE);
-    private final AtomicReference<String> fileCountAttribute = new AtomicReference<>(null);
-
-    private final AtomicInteger maxBinAgeSeconds = new AtomicInteger(Integer.MAX_VALUE);
-    private final Map<String, List<Bin>> groupBinMap = new HashMap<>();
-    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
-    private final Lock rLock = rwLock.readLock();
-    private final Lock wLock = rwLock.writeLock();
-
-    private int binCount = 0;   // guarded by read/write lock
-
-    public BinManager() {
-    }
-
-    public void purge() {
-        wLock.lock();
-        try {
-            for (final List<Bin> binList : groupBinMap.values()) {
-                for (final Bin bin : binList) {
-                    bin.getSession().rollback();
-                }
-            }
-            groupBinMap.clear();
-            binCount = 0;
-        } finally {
-            wLock.unlock();
-        }
-    }
-
-    public void setFileCountAttribute(final String fileCountAttribute) {
-        this.fileCountAttribute.set(fileCountAttribute);
-    }
-
-    public void setMinimumEntries(final int minimumEntries) {
-        this.minEntries.set(minimumEntries);
-    }
-
-    public void setMaximumEntries(final int maximumEntries) {
-        this.maxEntries.set(maximumEntries);
-    }
-
-    public int getBinCount() {
-        rLock.lock();
-        try {
-            return binCount;
-        } finally {
-            rLock.unlock();
-        }
-    }
-
-    public void setMinimumSize(final long numBytes) {
-        minSizeBytes.set(numBytes);
-    }
-
-    public void setMaximumSize(final long numBytes) {
-        maxSizeBytes.set(numBytes);
-    }
-
-    public void setMaxBinAge(final int seconds) {
-        maxBinAgeSeconds.set(seconds);
-    }
-
-    /**
-     * Adds the given flowFile to the first available bin in which it fits for the given group or creates a new bin in the specified group if necessary.
-     * <p/>
-     *
-     * @param groupIdentifier the group to which the flow file belongs; can be null
-     * @param flowFile the flow file to bin
-     * @param session the ProcessSession to which the FlowFile belongs
-     * @param sessionFactory a ProcessSessionFactory that can be used to create a new ProcessSession in order to
-     *            create a new bin if necessary
-     * @return true if added; false if no bin exists which can fit this item and no bin can be created based on current min/max criteria
-     */
-    public boolean offer(final String groupIdentifier, final FlowFile flowFile, final ProcessSession session, final ProcessSessionFactory sessionFactory) {
-        final long currentMaxSizeBytes = maxSizeBytes.get();
-        if (flowFile.getSize() > currentMaxSizeBytes) { //won't fit into any new bins (and probably none existing)
-            return false;
-        }
-        wLock.lock();
-        try {
-            final List<Bin> currentBins = groupBinMap.get(groupIdentifier);
-            if (currentBins == null) { // this is a new group we need to register
-                final List<Bin> bins = new ArrayList<>();
-                final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
-                    maxEntries.get(), fileCountAttribute.get());
-                bins.add(bin);
-                groupBinMap.put(groupIdentifier, bins);
-                binCount++;
-                return bin.offer(flowFile, session);
-            } else {
-                for (final Bin bin : currentBins) {
-                    final boolean accepted = bin.offer(flowFile, session);
-                    if (accepted) {
-                        return true;
-                    }
-                }
-
-                //if we've reached this point then we couldn't fit it into any existing bins - gotta make a new one
-                final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
-                    maxEntries.get(), fileCountAttribute.get());
-                currentBins.add(bin);
-                binCount++;
-                return bin.offer(flowFile, session);
-            }
-        } finally {
-            wLock.unlock();
-        }
-    }
-
-    /**
-     * Adds the given flowFiles to the first available bin in which it fits for the given group or creates a new bin in the specified group if necessary.
-     * <p/>
-     *
-     * @param groupIdentifier the group to which the flow file belongs; can be null
-     * @param flowFiles the flow files to bin
-     * @param session the ProcessSession to which the FlowFiles belong
-     * @param sessionFactory a ProcessSessionFactory that can be used to create a new ProcessSession in order to
-     *            create a new bin if necessary
-     * @return all of the FlowFiles that could not be successfully binned
-     */
-    public Set<FlowFile> offer(final String groupIdentifier, final Collection<FlowFile> flowFiles, final ProcessSession session, final ProcessSessionFactory sessionFactory) {
-        final long currentMaxSizeBytes = maxSizeBytes.get();
-        final Set<FlowFile> unbinned = new HashSet<>();
-
-        wLock.lock();
-        try {
-            flowFileLoop: for (final FlowFile flowFile : flowFiles) {
-                if (flowFile.getSize() > currentMaxSizeBytes) { //won't fit into any new bins (and probably none existing)
-                    unbinned.add(flowFile);
-                    continue;
-                }
-
-                final List<Bin> currentBins = groupBinMap.get(groupIdentifier);
-                if (currentBins == null) { // this is a new group we need to register
-                    final List<Bin> bins = new ArrayList<>();
-                    final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
-                        maxEntries.get(), fileCountAttribute.get());
-                    bins.add(bin);
-                    groupBinMap.put(groupIdentifier, bins);
-                    binCount++;
-
-                    final boolean added = bin.offer(flowFile, session);
-                    if (!added) {
-                        unbinned.add(flowFile);
-                    }
-                    continue;
-                } else {
-                    for (final Bin bin : currentBins) {
-                        final boolean accepted = bin.offer(flowFile, session);
-                        if (accepted) {
-                            continue flowFileLoop;
-                        }
-                    }
-
-                    //if we've reached this point then we couldn't fit it into any existing bins - gotta make a new one
-                    final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
-                        maxEntries.get(), fileCountAttribute.get());
-                    currentBins.add(bin);
-                    binCount++;
-                    final boolean added = bin.offer(flowFile, session);
-                    if (!added) {
-                        unbinned.add(flowFile);
-                    }
-
-                    continue;
-                }
-            }
-        } finally {
-            wLock.unlock();
-        }
-
-        return unbinned;
-    }
-
-    /**
-     * Finds all bins that are considered full and removes them from the manager.
-     * <p/>
-     * @param relaxFullnessConstraint if false will require bins to be full before considered ready; if true bins only have to meet their minimum size criteria or be 'old' and then they'll be
-     * considered ready
-     * @return bins that are considered full
-     */
-    public Collection<Bin> removeReadyBins(boolean relaxFullnessConstraint) {
-        final Map<String, List<Bin>> newGroupMap = new HashMap<>();
-        final List<Bin> readyBins = new ArrayList<>();
-
-        wLock.lock();
-        try {
-            for (final Map.Entry<String, List<Bin>> group : groupBinMap.entrySet()) {
-                final List<Bin> remainingBins = new ArrayList<>();
-                for (final Bin bin : group.getValue()) {
-                    if (relaxFullnessConstraint && (bin.isFullEnough() || bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS))) { //relaxed check
-                        readyBins.add(bin);
-                    } else if (!relaxFullnessConstraint && bin.isFull()) { //strict check
-                        readyBins.add(bin);
-                    } else { //it isn't time yet...
-                        remainingBins.add(bin);
-                    }
-                }
-                if (!remainingBins.isEmpty()) {
-                    newGroupMap.put(group.getKey(), remainingBins);
-                }
-            }
-            groupBinMap.clear();
-            groupBinMap.putAll(newGroupMap);
-            binCount -= readyBins.size();
-        } finally {
-            wLock.unlock();
-        }
-        return readyBins;
-    }
-
-    public Bin removeOldestBin() {
-        wLock.lock();
-        try {
-            Bin oldestBin = null;
-            String oldestBinGroup = null;
-
-            for (final Map.Entry<String, List<Bin>> group : groupBinMap.entrySet()) {
-                for (final Bin bin : group.getValue()) {
-                    if (oldestBin == null || bin.isOlderThan(oldestBin)) {
-                        oldestBin = bin;
-                        oldestBinGroup = group.getKey();
-                    }
-                }
-            }
-
-            if (oldestBin == null) {
-                return null;
-            }
-
-            binCount--;
-            final List<Bin> bins = groupBinMap.get(oldestBinGroup);
-            bins.remove(oldestBin);
-            if (bins.isEmpty()) {
-                groupBinMap.remove(oldestBinGroup);
-            }
-            return oldestBin;
-        } finally {
-            wLock.unlock();
-        }
-    }
-
-    /**
-     * @return true if any current bins are older than the allowable max
-     */
-    public boolean containsOldBins() {
-        rLock.lock();
-        try {
-            for (final List<Bin> bins : groupBinMap.values()) {
-                for (final Bin bin : bins) {
-                    if (bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS)) {
-                        return true;
-                    }
-                }
-            }
-        } finally {
-            rLock.unlock();
-        }
-        return false;
-    }
-}


Mime
View raw message