nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbe...@apache.org
Subject [2/6] nifi git commit: NIFI-1868: Add PutHiveStreaming processor
Date Thu, 04 Aug 2016 14:06:19 GMT
NIFI-1868: Add PutHiveStreaming processor

Signed-off-by: Bryan Bende <bbende@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c2019b93
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c2019b93
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c2019b93

Branch: refs/heads/master
Commit: c2019b9339c479d0c6a50fa7894a983d336640a3
Parents: cda4310
Author: Matt Burgess <mattyb149@apache.org>
Authored: Thu Jul 21 11:59:41 2016 -0400
Committer: Bryan Bende <bbende@apache.org>
Committed: Thu Aug 4 10:05:44 2016 -0400

----------------------------------------------------------------------
 .../nifi-hive-processors/pom.xml                |  28 +-
 .../nifi/dbcp/hive/HiveConnectionPool.java      |  75 +--
 .../nifi/processors/hive/ConvertAvroToORC.java  |   3 +-
 .../nifi/processors/hive/PutHiveStreaming.java  | 657 +++++++++++++++++++
 .../hive/AuthenticationFailedException.java     |  26 +
 .../apache/nifi/util/hive/HiveConfigurator.java |  96 +++
 .../apache/nifi/util/hive/HiveJdbcCommon.java   |  32 -
 .../org/apache/nifi/util/hive/HiveOptions.java  | 151 +++++
 .../org/apache/nifi/util/hive/HiveUtils.java    |  79 +++
 .../org/apache/nifi/util/hive/HiveWriter.java   | 440 +++++++++++++
 .../nifi/util/hive/ValidationResources.java     |  41 ++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../processors/hive/TestPutHiveStreaming.java   | 588 +++++++++++++++++
 .../src/test/resources/core-site-security.xml   |  30 +
 .../src/test/resources/core-site.xml            |  22 +
 .../src/test/resources/fake.keytab              |   0
 .../src/test/resources/hive-site-security.xml   |  26 +
 .../src/test/resources/hive-site.xml            |  22 +
 .../src/test/resources/krb5.conf                |   0
 .../src/test/resources/user.avsc                |  25 +
 nifi-nar-bundles/nifi-hive-bundle/pom.xml       |  15 +
 21 files changed, 2270 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c2019b93/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
index f2e834f..1e5b0fd 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
@@ -26,8 +26,8 @@
     <packaging>jar</packaging>
 
     <properties>
-        <hive.version>2.0.0</hive.version>
-        <orc.version>1.1.1</orc.version>
+        <hive.version>2.0.1</hive.version>
+        <orc.version>1.1.2</orc.version>
     </properties>
 
 
@@ -145,6 +145,30 @@
             <version>${orc.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.hive.hcatalog</groupId>
+            <artifactId>hive-hcatalog-streaming</artifactId>
+            <version>${hive.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-avatica</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive.hcatalog</groupId>
+            <artifactId>hive-hcatalog-core</artifactId>
+            <version>${hive.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2019b93/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
index 9c4065d..c2ec43b 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
@@ -30,13 +30,16 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.hadoop.KerberosProperties;
-import org.apache.nifi.hadoop.KerberosTicketRenewer;
 import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.hive.HiveJdbcCommon;
+import org.apache.nifi.util.hive.AuthenticationFailedException;
+import org.apache.nifi.util.hive.HiveConfigurator;
+import org.apache.nifi.util.hive.HiveUtils;
+import org.apache.nifi.util.hive.ValidationResources;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
@@ -74,7 +77,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
             .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
                     + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
                     + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
-            .required(false).addValidator(HiveJdbcCommon.createMultipleFilesExistValidator()).build();
+            .required(false).addValidator(HiveUtils.createMultipleFilesExistValidator()).build();
 
     public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
             .name("hive-db-user")
@@ -116,10 +119,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
             .sensitive(false)
             .build();
 
-    static final long TICKET_RENEWAL_PERIOD = 60000;
-
-    private volatile UserGroupInformation ugi;
-    private volatile KerberosTicketRenewer renewer;
+    private static final long TICKET_RENEWAL_PERIOD = 60000;
 
     private final static List<PropertyDescriptor> properties;
     private static KerberosProperties kerberosProperties;
@@ -131,6 +131,8 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
 
     private volatile BasicDataSource dataSource;
 
+    private volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
+    private volatile UserGroupInformation ugi;
 
     static {
         kerberosProperties = KerberosProperties.create(NiFiProperties.getInstance());
@@ -160,22 +162,9 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
 
         if (confFileProvided) {
             final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
-            ValidationResources resources = validationResourceHolder.get();
-
-            // if no resources in the holder, or if the holder has different resources loaded,
-            // then load the Configuration and set the new resources in the holder
-            if (resources == null || !configFiles.equals(resources.getConfigResources())) {
-                getLogger().debug("Reloading validation resources");
-                resources = new ValidationResources(configFiles, HiveJdbcCommon.getConfigurationFromFiles(configFiles));
-                validationResourceHolder.set(resources);
-            }
-
-            final Configuration hiveConfig = resources.getConfiguration();
             final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
-            final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
-
-            problems.addAll(KerberosProperties.validatePrincipalAndKeytab(
-                    this.getClass().getSimpleName(), hiveConfig, principal, keytab, getLogger()));
+            final String keyTab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
+            problems.addAll(hiveConfigurator.validate(configFiles, principal, keyTab, validationResourceHolder, getLogger()));
         }
 
         return problems;
@@ -194,12 +183,14 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
      * @throws InitializationException if unable to create a database connection
      */
     @OnEnabled
-    public void onConfigured(final ConfigurationContext context) throws InitializationException, IOException {
+    public void onConfigured(final ConfigurationContext context) throws InitializationException {
 
         connectionUrl = context.getProperty(DATABASE_URL).getValue();
 
+        ComponentLog log = getLogger();
+
         final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
-        final Configuration hiveConfig = HiveJdbcCommon.getConfigurationFromFiles(configFiles);
+        final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
 
         // add any dynamic properties to the Hive configuration
         for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
@@ -214,15 +205,15 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
             final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
             final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
 
-            getLogger().info("HBase Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
-            ugi = SecurityUtil.loginKerberos(hiveConfig, principal, keyTab);
+            log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
+            try {
+                ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab, TICKET_RENEWAL_PERIOD, log);
+            } catch (AuthenticationFailedException ae) {
+                log.error(ae.getMessage(), ae);
+            }
             getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab});
 
-            // if we got here then we have a ugi so start a renewer
-            if (ugi != null) {
-                final String id = getClass().getSimpleName();
-                renewer = SecurityUtil.startTicketRenewalThread(id, ugi, TICKET_RENEWAL_PERIOD, getLogger());
-            }
+
         }
         final String user = context.getProperty(DB_USER).getValue();
         final String passw = context.getProperty(DB_PASSWORD).getValue();
@@ -248,9 +239,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
     @OnDisabled
     public void shutdown() {
 
-        if (renewer != null) {
-            renewer.stop();
-        }
+        hiveConfigurator.stopRenewer();
 
         try {
             dataSource.close();
@@ -290,22 +279,4 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
         return connectionUrl;
     }
 
-    private static class ValidationResources {
-        private final String configResources;
-        private final Configuration configuration;
-
-        public ValidationResources(String configResources, Configuration configuration) {
-            this.configResources = configResources;
-            this.configuration = configuration;
-        }
-
-        public String getConfigResources() {
-            return configResources;
-        }
-
-        public Configuration getConfiguration() {
-            return configuration;
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2019b93/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
index b0c3e95..2cec1d0 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
@@ -44,6 +44,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.hive.HiveJdbcCommon;
+import org.apache.nifi.util.hive.HiveUtils;
 import org.apache.nifi.util.orc.OrcFlowFileWriter;
 import org.apache.nifi.util.orc.OrcUtils;
 import org.apache.orc.CompressionKind;
@@ -98,7 +99,7 @@ public class ConvertAvroToORC extends AbstractProcessor {
             .displayName("ORC Configuration Resources")
             .description("A file or comma separated list of files which contains the ORC configuration (hive-site.xml, e.g.). Without this, Hadoop "
                     + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Please see the ORC documentation for more details.")
-            .required(false).addValidator(HiveJdbcCommon.createMultipleFilesExistValidator()).build();
+            .required(false).addValidator(HiveUtils.createMultipleFilesExistValidator()).build();
 
     public static final PropertyDescriptor STRIPE_SIZE = new PropertyDescriptor.Builder()
             .name("orc-stripe-size")

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2019b93/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
new file mode 100644
index 0000000..e4515da
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.streaming.ConnectionError;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.SerializationError;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.hive.AuthenticationFailedException;
+import org.apache.nifi.util.hive.HiveConfigurator;
+import org.apache.nifi.util.hive.HiveOptions;
+import org.apache.nifi.util.hive.HiveUtils;
+import org.apache.nifi.util.hive.HiveWriter;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
+
+/**
+ * This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table.
+ */
+@Tags({"hive", "streaming", "put", "database", "store"})
+@CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in "
+        + "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). "
+        + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. ")
+@WritesAttributes({
+        @WritesAttribute(attribute = "hivestreaming.record.count", description = "The number of records from this flow file written using Hive Streaming.")
+})
+public class PutHiveStreaming extends AbstractProcessor {
+
+    // Attributes
+    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
+
+    // Validators
+    private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> {
+        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+            return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+        }
+
+        String reason = null;
+        try {
+            final int intVal = Integer.parseInt(value);
+
+            if (intVal < 2) {
+                reason = "value is less than 2";
+            }
+        } catch (final NumberFormatException e) {
+            reason = "value is not a valid integer";
+        }
+
+        return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
+    };
+
+    // Properties
+    public static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
+            .name("hive-stream-metastore-uri")
+            .displayName("Hive Metastore URI")
+            .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
+                    + "Hive metastore is 9043.")
+            .required(true)
+            .addValidator(StandardValidators.URI_VALIDATOR)
+            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
+            .build();
+
+    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
+            .name("hive-config-resources")
+            .displayName("Hive Configuration Resources")
+            .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
+                    + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
+                    + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
+            .required(false)
+            .addValidator(HiveUtils.createMultipleFilesExistValidator())
+            .build();
+
+    public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
+            .name("hive-stream-database-name")
+            .displayName("Database Name")
+            .description("The name of the database in which to put the data.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("hive-stream-table-name")
+            .displayName("Table Name")
+            .description("The name of the database table in which to put the data.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PARTITION_COLUMNS = new PropertyDescriptor.Builder()
+            .name("hive-stream-partition-cols")
+            .displayName("Partition Columns")
+            .description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must "
+                    + "correspond exactly to the order of partition columns specified during the table creation.")
+            .required(false)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries
+            .build();
+
+    public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
+            .name("hive-stream-autocreate-partition")
+            .displayName("Auto-Create Partitions")
+            .description("Flag indicating whether partitions should be automatically created")
+            .required(true)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new PropertyDescriptor.Builder()
+            .name("hive-stream-max-open-connections")
+            .displayName("Max Open Connections")
+            .description("The maximum number of open connections that can be allocated from this pool at the same time, "
+                    + "or negative for no limit.")
+            .defaultValue("8")
+            .required(true)
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .sensitive(false)
+            .build();
+
+    public static final PropertyDescriptor HEARTBEAT_INTERVAL = new PropertyDescriptor.Builder()
+            .name("hive-stream-heartbeat-interval")
+            .displayName("Heartbeat Interval")
+            .description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. "
+                    + "A value of 0 indicates that no heartbeat should be sent.")
+            .defaultValue("60")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .sensitive(false)
+            .build();
+
+    public static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder()
+            .name("hive-stream-transactions-per-batch")
+            .displayName("Transactions per Batch")
+            .description("A hint to Hive Streaming indicating how many transactions the processor task will need. This value must be greater than 1.")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(GREATER_THAN_ONE_VALIDATOR)
+            .defaultValue("100")
+            .build();
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the database is successfully updated")
+            .build();
+    public static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail.")
+            .build();
+
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private final static Set<Relationship> relationships;
+
+    private static final long TICKET_RENEWAL_PERIOD = 60000;
+
+    protected KerberosProperties kerberosProperties;
+
+    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
+    protected volatile UserGroupInformation ugi;
+
+    protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
+
+    protected HiveOptions options;
+    protected ExecutorService callTimeoutPool;
+    protected transient Timer heartBeatTimer;
+    protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
+    protected Map<HiveEndPoint, HiveWriter> allWriters;
+
+
+    /*
+     * Will ensure that the list of property descriptors is build only once.
+     * Will also create a Set of relationships
+     */
+    static {
+        propertyDescriptors = new ArrayList<>();
+        propertyDescriptors.add(METASTORE_URI);
+        propertyDescriptors.add(HIVE_CONFIGURATION_RESOURCES);
+        propertyDescriptors.add(DB_NAME);
+        propertyDescriptors.add(TABLE_NAME);
+        propertyDescriptors.add(PARTITION_COLUMNS);
+        propertyDescriptors.add(AUTOCREATE_PARTITIONS);
+        propertyDescriptors.add(MAX_OPEN_CONNECTIONS);
+        propertyDescriptors.add(HEARTBEAT_INTERVAL);
+        propertyDescriptors.add(TXNS_PER_BATCH);
+
+        Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        _relationships.add(REL_RETRY);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    @Override
+    protected void init(ProcessorInitializationContext context) {
+        kerberosProperties = getKerberosProperties();
+        propertyDescriptors.add(kerberosProperties.getKerberosPrincipal());
+        propertyDescriptors.add(kerberosProperties.getKerberosKeytab());
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+
+    @OnScheduled
+    public void setup(final ProcessContext context) {
+        ComponentLog log = getLogger();
+
+        final String metastoreUri = context.getProperty(METASTORE_URI).getValue();
+        final String dbName = context.getProperty(DB_NAME).getValue();
+        final String tableName = context.getProperty(TABLE_NAME).getValue();
+        final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
+        final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
+        final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger();
+        final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger();
+        final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
+        final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
+
+        // add any dynamic properties to the Hive configuration
+        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+            final PropertyDescriptor descriptor = entry.getKey();
+            if (descriptor.isDynamic()) {
+                hiveConfig.set(descriptor.getName(), entry.getValue());
+            }
+        }
+
+        options = new HiveOptions(metastoreUri, dbName, tableName)
+                .withTxnsPerBatch(txnsPerBatch)
+                .withAutoCreatePartitions(autoCreatePartitions)
+                .withMaxOpenConnections(maxConnections)
+                .withHeartBeatInterval(heartbeatInterval);
+
+        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
+            final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
+            final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
+
+            log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
+            try {
+                ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab, TICKET_RENEWAL_PERIOD, log);
+            } catch (AuthenticationFailedException ae) {
+                throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae);
+            }
+            log.info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab});
+            options = options.withKerberosPrincipal(principal).withKerberosKeytab(keyTab);
+        }
+
+        allWriters = new ConcurrentHashMap<>();
+        String timeoutName = "put-hive-streaming-%d";
+        this.callTimeoutPool = Executors.newFixedThreadPool(1,
+                new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+
+        sendHeartBeat.set(true);
+        heartBeatTimer = new Timer();
+        setupHeartBeatTimer();
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ComponentLog log = getLogger();
+        try {
+            final List<String> partitionColumnList;
+            String partitionColumns = context.getProperty(PARTITION_COLUMNS).getValue();
+            if (StringUtils.isEmpty(partitionColumns)) {
+                partitionColumnList = Collections.emptyList();
+            } else {
+                String[] partitionCols = partitionColumns.split(",");
+                partitionColumnList = new ArrayList<>(partitionCols.length);
+                for (String col : partitionCols) {
+                    partitionColumnList.add(col.trim());
+                }
+            }
+
+            // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
+            ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
+            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+
+            int recordCount = 0;
+            final List<HiveStreamingRecord> records = new LinkedList<>();
+
+            session.read(flowFile, in -> {
+
+                try (final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
+
+                    GenericRecord currRecord;
+                    while (reader.hasNext()) {
+                        currRecord = reader.next();
+                        List<String> partitionValues = new ArrayList<>();
+
+                        for (String partition : partitionColumnList) {
+                            Object partitionValue = currRecord.get(partition);
+                            if (partitionValue == null) {
+                                throw new IOException("Partition column '" + partition + "' not found in Avro record");
+                            }
+                            partitionValues.add(partitionValue.toString());
+                        }
+
+                        List<Schema.Field> fields = currRecord.getSchema().getFields();
+                        if (fields != null) {
+                            JSONObject obj = new JSONObject();
+                            for (Schema.Field field : fields) {
+                                String fieldName = field.name();
+                                // Skip fields that are partition columns, we extracted those values above to create an EndPoint
+                                if (!partitionColumnList.contains(fieldName)) {
+                                    Object value = currRecord.get(fieldName);
+                                    try {
+                                        obj.put(fieldName, value);
+                                    } catch (JSONException je) {
+                                        throw new IOException(je);
+                                    }
+                                }
+                            }
+                            records.add(new HiveStreamingRecord(partitionValues, obj));
+                        }
+                    }
+                }
+            });
+
+            // Write all records to Hive Streaming
+            for (HiveStreamingRecord record : records) {
+                HiveEndPoint endPoint = makeHiveEndPoint(record.getPartitionValues(), options);
+                HiveWriter writer = getOrCreateWriter(endPoint);
+                writer.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8));
+                recordCount++;
+            }
+
+            flowFile = session.putAttribute(flowFile, HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(recordCount));
+            flushAllWriters(true);
+
+            session.getProvenanceReporter().send(flowFile, options.getMetaStoreURI());
+            session.transfer(flowFile, REL_SUCCESS);
+
+            // Restore original class loader, might not be necessary but is good practice since the processor task changed it
+            Thread.currentThread().setContextClassLoader(originalClassloader);
+
+        } catch (HiveWriter.CommitFailure commitFailure) {
+            log.error("Error committing to Hive", commitFailure);
+            session.transfer(flowFile, REL_FAILURE);
+        } catch (HiveWriter.TxnBatchFailure | HiveWriter.TxnFailure txnFailure) {
+            log.error("Hive Streaming Transaction Failure", txnFailure);
+            session.transfer(flowFile, REL_FAILURE);
+        } catch (InterruptedException e) {
+            log.error("Hive Streaming Interrupted, flow file will be penalized and routed to retry", e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_RETRY);
+        } catch (ConnectionError | HiveWriter.ConnectFailure ce) {
+            log.error("Error while connecting via Hive Streaming, flow file will be penalized and routed to retry", ce);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_RETRY);
+        } catch (SerializationError se) {
+            log.error("Serialization exception occurred, record not written to Hive.", se);
+            session.transfer(flowFile, REL_FAILURE);
+        } catch (HiveWriter.WriteFailure wf) {
+            log.error("Error while writing record to Hive Streaming", wf);
+            abortAndCloseWriters();
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
+    @OnStopped
+    public void cleanup() {
+        ComponentLog log = getLogger();
+        sendHeartBeat.set(false);
+        for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
+            try {
+                HiveWriter w = entry.getValue();
+                w.flushAndClose();
+            } catch (Exception ex) {
+                log.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.", ex);
+                if (ex instanceof InterruptedException) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        callTimeoutPool.shutdown();
+        try {
+            while (!callTimeoutPool.isTerminated()) {
+                callTimeoutPool.awaitTermination(
+                        options.getCallTimeOut(), TimeUnit.MILLISECONDS);
+            }
+        } catch (Throwable t) {
+            log.warn("shutdown interrupted on " + callTimeoutPool, t);
+        }
+
+        callTimeoutPool = null;
+    }
+
+    private void setupHeartBeatTimer() {
+        if (options.getHeartBeatInterval() > 0) {
+            final ComponentLog log = getLogger();
+            heartBeatTimer.schedule(new TimerTask() {
+                @Override
+                public void run() {
+                    try {
+                        if (sendHeartBeat.get()) {
+                            log.debug("Start sending heartbeat on all writers");
+                            sendHeartBeatOnAllWriters();
+                            setupHeartBeatTimer();
+                        }
+                    } catch (Exception e) {
+                        log.warn("Failed to heartbeat on HiveWriter ", e);
+                    }
+                }
+            }, options.getHeartBeatInterval() * 1000);
+        }
+    }
+
+    private void sendHeartBeatOnAllWriters() throws InterruptedException {
+        for (HiveWriter writer : allWriters.values()) {
+            writer.heartBeat();
+        }
+    }
+
+    private void flushAllWriters(boolean rollToNext)
+            throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
+        for (HiveWriter writer : allWriters.values()) {
+            writer.flush(rollToNext);
+        }
+    }
+
+    private void abortAndCloseWriters() {
+        try {
+            abortAllWriters();
+            closeAllWriters();
+        } catch (Exception ie) {
+            getLogger().warn("unable to close hive connections. ", ie);
+        }
+    }
+
+    /**
+     * Abort current Txn on all writers
+     */
+    private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure {
+        for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
+            try {
+                entry.getValue().abort();
+            } catch (Exception e) {
+                getLogger().error("Failed to abort hive transaction batch, HiveEndPoint " + entry.getValue() + " due to exception ", e);
+            }
+        }
+    }
+
+    /**
+     * Closes all writers and remove them from cache
+     */
+    private void closeAllWriters() {
+        //1) Retire writers
+        for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
+            try {
+                entry.getValue().close();
+            } catch (Exception e) {
+                getLogger().warn("unable to close writers. ", e);
+            }
+        }
+        //2) Clear cache
+        allWriters.clear();
+    }
+
+    private HiveWriter getOrCreateWriter(HiveEndPoint endPoint) throws HiveWriter.ConnectFailure, InterruptedException {
+        ComponentLog log = getLogger();
+        try {
+            HiveWriter writer = allWriters.get(endPoint);
+            if (writer == null) {
+                log.debug("Creating Writer to Hive end point : " + endPoint);
+                writer = makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
+                if (allWriters.size() > (options.getMaxOpenConnections() - 1)) {
+                    log.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", new Object[]{allWriters.size(), options.getMaxOpenConnections()});
+                    int retired = retireIdleWriters();
+                    if (retired == 0) {
+                        retireEldestWriter();
+                    }
+                }
+                allWriters.put(endPoint, writer);
+                HiveUtils.logAllHiveEndPoints(allWriters);
+            }
+            return writer;
+        } catch (HiveWriter.ConnectFailure e) {
+            log.error("Failed to create HiveWriter for endpoint: " + endPoint, e);
+            throw e;
+        }
+    }
+
+    /**
+     * Locate writer that has not been used for longest time and retire it
+     */
+    private void retireEldestWriter() {
+        ComponentLog log = getLogger();
+
+        log.info("Attempting close eldest writers");
+        long oldestTimeStamp = System.currentTimeMillis();
+        HiveEndPoint eldest = null;
+        for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
+            if (entry.getValue().getLastUsed() < oldestTimeStamp) {
+                eldest = entry.getKey();
+                oldestTimeStamp = entry.getValue().getLastUsed();
+            }
+        }
+        try {
+            log.info("Closing least used Writer to Hive end point : " + eldest);
+            allWriters.remove(eldest).flushAndClose();
+        } catch (IOException e) {
+            log.warn("Failed to close writer for end point: " + eldest, e);
+        } catch (InterruptedException e) {
+            log.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
+            Thread.currentThread().interrupt();
+        } catch (Exception e) {
+            log.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
+        }
+    }
+
+    /**
+     * Locate all writers past idle timeout and retire them
+     *
+     * @return number of writers retired
+     */
+    private int retireIdleWriters() {
+        ComponentLog log = getLogger();
+
+        log.info("Attempting to close idle HiveWriters");
+        int count = 0;
+        long now = System.currentTimeMillis();
+        ArrayList<HiveEndPoint> retirees = new ArrayList<>();
+
+        //1) Find retirement candidates
+        for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
+            if (now - entry.getValue().getLastUsed() > options.getIdleTimeout()) {
+                ++count;
+                retirees.add(entry.getKey());
+            }
+        }
+        //2) Retire them
+        for (HiveEndPoint ep : retirees) {
+            try {
+                log.info("Closing idle Writer to Hive end point : {}", new Object[]{ep});
+                allWriters.remove(ep).flushAndClose();
+            } catch (IOException e) {
+                log.warn("Failed to close HiveWriter for end point: {}. Error: " + ep, e);
+            } catch (InterruptedException e) {
+                log.warn("Interrupted when attempting to close HiveWriter for end point: " + ep, e);
+                Thread.currentThread().interrupt();
+            } catch (Exception e) {
+                log.warn("Interrupted when attempting to close HiveWriter for end point: " + ep, e);
+            }
+        }
+        return count;
+    }
+
+    protected HiveEndPoint makeHiveEndPoint(List<String> partitionValues, HiveOptions options) throws ConnectionError {
+        return HiveUtils.makeEndPoint(partitionValues, options);
+    }
+
+    protected HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options)
+            throws HiveWriter.ConnectFailure, InterruptedException {
+        return HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
+    }
+
+    protected KerberosProperties getKerberosProperties() {
+        return KerberosProperties.create(NiFiProperties.getInstance());
+    }
+
+    protected class HiveStreamingRecord {
+
+        private List<String> partitionValues;
+        private JSONObject record;
+
+        public HiveStreamingRecord(List<String> partitionValues, JSONObject record) {
+            this.partitionValues = partitionValues;
+            this.record = record;
+        }
+
+        public List<String> getPartitionValues() {
+            return partitionValues;
+        }
+
+        public JSONObject getRecord() {
+            return record;
+        }
+
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2019b93/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/AuthenticationFailedException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/AuthenticationFailedException.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/AuthenticationFailedException.java
new file mode 100644
index 0000000..2b19f2a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/AuthenticationFailedException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.hive;
+
+/**
+ * Created by mburgess on 5/4/16.
+ */
+public class AuthenticationFailedException extends Exception {
+    public AuthenticationFailedException(String reason, Exception cause) {
+        super(reason, cause);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2019b93/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
new file mode 100644
index 0000000..748847d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.hive;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.hadoop.KerberosTicketRenewer;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Created by mburgess on 5/4/16.
+ */
+public class HiveConfigurator {
+
+    private volatile KerberosTicketRenewer renewer;
+
+
+    public Collection<ValidationResult> validate(String configFiles, String principal, String keyTab, AtomicReference<ValidationResources> validationResourceHolder, ComponentLog log) {
+
+        final List<ValidationResult> problems = new ArrayList<>();
+        ValidationResources resources = validationResourceHolder.get();
+
+        // if no resources in the holder, or if the holder has different resources loaded,
+        // then load the Configuration and set the new resources in the holder
+        if (resources == null || !configFiles.equals(resources.getConfigResources())) {
+            log.debug("Reloading validation resources");
+            resources = new ValidationResources(configFiles, getConfigurationFromFiles(configFiles));
+            validationResourceHolder.set(resources);
+        }
+
+        final Configuration hiveConfig = resources.getConfiguration();
+
+        problems.addAll(KerberosProperties.validatePrincipalAndKeytab(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, log));
+
+        return problems;
+    }
+
+    public Configuration getConfigurationFromFiles(final String configFiles) {
+        final Configuration hiveConfig = new HiveConf();
+        if (StringUtils.isNotBlank(configFiles)) {
+            for (final String configFile : configFiles.split(",")) {
+                hiveConfig.addResource(new Path(configFile.trim()));
+            }
+        }
+        return hiveConfig;
+    }
+
+    public UserGroupInformation authenticate(final Configuration hiveConfig, String principal, String keyTab, long ticketRenewalPeriod, ComponentLog log) throws AuthenticationFailedException {
+
+        UserGroupInformation ugi;
+        try {
+            ugi = SecurityUtil.loginKerberos(hiveConfig, principal, keyTab);
+        } catch (IOException ioe) {
+            throw new AuthenticationFailedException("Kerberos Authentication for Hive failed", ioe);
+        }
+
+        // if we got here then we have a ugi so start a renewer
+        if (ugi != null) {
+            final String id = getClass().getSimpleName();
+            renewer = SecurityUtil.startTicketRenewalThread(id, ugi, ticketRenewalPeriod, log);
+        }
+        return ugi;
+    }
+
+    public void stopRenewer() {
+        if (renewer != null) {
+            renewer.stop();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2019b93/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
index 284eabb..5de36dc 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
@@ -29,11 +29,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.math.BigDecimal;
@@ -347,34 +343,6 @@ public class HiveJdbcCommon {
         void processRow(ResultSet resultSet) throws IOException;
     }
 
-    /**
-     * Validates that one or more files exist, as specified in a single property.
-     */
-    public static Validator createMultipleFilesExistValidator() {
-        return new Validator() {
-
-            @Override
-            public ValidationResult validate(String subject, String input, ValidationContext context) {
-                final String[] files = input.split(",");
-                for (String filename : files) {
-                    try {
-                        final File file = new File(filename.trim());
-                        final boolean valid = file.exists() && file.isFile();
-                        if (!valid) {
-                            final String message = "File " + file + " does not exist or is not a file";
-                            return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
-                        }
-                    } catch (SecurityException e) {
-                        final String message = "Unable to access " + filename + " due to " + e.getMessage();
-                        return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
-                    }
-                }
-                return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
-            }
-
-        };
-    }
-
     public static Configuration getConfigurationFromFiles(final String configFiles) {
         final Configuration hiveConfig = new HiveConf();
         if (StringUtils.isNotBlank(configFiles)) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2019b93/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java
new file mode 100644
index 0000000..2c37380
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.util.hive;
+
+import java.io.Serializable;
+
+
+public class HiveOptions implements Serializable {
+    /**
+     * Half of the default Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
+     */
+    private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15;
+
+    protected String databaseName;
+    protected String tableName;
+    protected String metaStoreURI;
+    protected Integer txnsPerBatch = 100;
+    protected Integer maxOpenConnections = 10;
+    protected Integer batchSize = 15000;
+    protected Integer idleTimeout = 60000;
+    protected Integer callTimeout = 0;
+    protected Integer heartBeatInterval = 60;
+    protected Boolean autoCreatePartitions = true;
+    protected String kerberosPrincipal;
+    protected String kerberosKeytab;
+    protected Integer tickTupleInterval = DEFAULT_TICK_TUPLE_INTERVAL_SECS;
+
+    public HiveOptions(String metaStoreURI, String databaseName, String tableName) {
+        this.metaStoreURI = metaStoreURI;
+        this.databaseName = databaseName;
+        this.tableName = tableName;
+    }
+
+    public HiveOptions withTickTupleInterval(Integer tickInterval) {
+        this.tickTupleInterval = tickInterval;
+        return this;
+    }
+
+    public HiveOptions withTxnsPerBatch(Integer txnsPerBatch) {
+        this.txnsPerBatch = txnsPerBatch;
+        return this;
+    }
+
+    public HiveOptions withMaxOpenConnections(Integer maxOpenConnections) {
+        this.maxOpenConnections = maxOpenConnections;
+        return this;
+    }
+
+    public HiveOptions withBatchSize(Integer batchSize) {
+        this.batchSize = batchSize;
+        return this;
+    }
+
+    public HiveOptions withIdleTimeout(Integer idleTimeout) {
+        this.idleTimeout = idleTimeout;
+        return this;
+    }
+
+    public HiveOptions withCallTimeout(Integer callTimeout) {
+        this.callTimeout = callTimeout;
+        return this;
+    }
+
+    public HiveOptions withHeartBeatInterval(Integer heartBeatInterval) {
+        this.heartBeatInterval = heartBeatInterval;
+        return this;
+    }
+
+    public HiveOptions withAutoCreatePartitions(Boolean autoCreatePartitions) {
+        this.autoCreatePartitions = autoCreatePartitions;
+        return this;
+    }
+
+    public HiveOptions withKerberosKeytab(String kerberosKeytab) {
+        this.kerberosKeytab = kerberosKeytab;
+        return this;
+    }
+
+    public HiveOptions withKerberosPrincipal(String kerberosPrincipal) {
+        this.kerberosPrincipal = kerberosPrincipal;
+        return this;
+    }
+
+    public String getMetaStoreURI() {
+        return metaStoreURI;
+    }
+
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public Integer getBatchSize() {
+        return batchSize;
+    }
+
+    public Integer getCallTimeOut() {
+        return callTimeout;
+    }
+
+    public Integer getHeartBeatInterval() {
+        return heartBeatInterval;
+    }
+
+    public Integer getMaxOpenConnections() {
+        return maxOpenConnections;
+    }
+
+    public Integer getIdleTimeout() {
+        return idleTimeout;
+    }
+
+    public Integer getTxnsPerBatch() {
+        return txnsPerBatch;
+    }
+
+    public Boolean getAutoCreatePartitions() {
+        return autoCreatePartitions;
+    }
+
+    public String getKerberosPrincipal() {
+        return kerberosPrincipal;
+    }
+
+    public String getKerberosKeytab() {
+        return kerberosKeytab;
+    }
+
+    public Integer getTickTupleInterval() {
+        return tickTupleInterval;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2019b93/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java
new file mode 100644
index 0000000..ad34226
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.util.hive;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.hive.hcatalog.streaming.ConnectionError;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+public class HiveUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveUtils.class);
+
+    public static HiveEndPoint makeEndPoint(List<String> partitionVals, HiveOptions options) throws ConnectionError {
+        if(partitionVals==null) {
+            return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), null);
+        }
+        return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), partitionVals);
+    }
+
+    public static HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options)
+        throws HiveWriter.ConnectFailure, InterruptedException {
+        return new HiveWriter(endPoint, options.getTxnsPerBatch(), options.getAutoCreatePartitions(),
+                              options.getCallTimeOut(), callTimeoutPool, ugi);
+    }
+
+    public static void logAllHiveEndPoints(Map<HiveEndPoint, HiveWriter> allWriters) {
+        for (Map.Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+            LOG.info("cached writers {} ", entry.getValue());
+        }
+    }
+
+    /**
+     * Validates that one or more files exist, as specified in a single property.
+     */
+    public static Validator createMultipleFilesExistValidator() {
+        return (subject, input, context) -> {
+            final String[] files = input.split("\\s*,\\s*");
+            for (String filename : files) {
+                try {
+                    final File file = new File(filename.trim());
+                    final boolean valid = file.exists() && file.isFile();
+                    if (!valid) {
+                        final String message = "File " + file + " does not exist or is not a file";
+                        return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
+                    }
+                } catch (SecurityException e) {
+                    final String message = "Unable to access " + filename + " due to " + e.getMessage();
+                    return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
+                }
+            }
+            return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2019b93/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveWriter.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveWriter.java
new file mode 100644
index 0000000..15cf978
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveWriter.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.util.hive;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.SerializationError;
+import org.apache.hive.hcatalog.streaming.StreamingConnection;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.hive.hcatalog.streaming.StreamingIOFailure;
+import org.apache.hive.hcatalog.streaming.StrictJsonWriter;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HiveWriter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HiveWriter.class);
+
+    private final HiveEndPoint endPoint;
+    private final StreamingConnection connection;
+    private final int txnsPerBatch;
+    private final RecordWriter recordWriter;
+    private final ExecutorService callTimeoutPool;
+    private final long callTimeout;
+    private final Object txnBatchLock = new Object();
+    private TransactionBatch txnBatch;
+    private long lastUsed; // time of last flush on this writer
+    protected boolean closed; // flag indicating HiveWriter was closed
+    private boolean autoCreatePartitions;
+    private UserGroupInformation ugi;
+    private int totalRecords = 0;
+
+    public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch,
+                      boolean autoCreatePartitions, long callTimeout,
+                      ExecutorService callTimeoutPool, UserGroupInformation ugi)
+            throws InterruptedException, ConnectFailure {
+        try {
+            this.autoCreatePartitions = autoCreatePartitions;
+            this.callTimeout = callTimeout;
+            this.callTimeoutPool = callTimeoutPool;
+            this.endPoint = endPoint;
+            this.ugi = ugi;
+            this.connection = newConnection(ugi);
+            this.txnsPerBatch = txnsPerBatch;
+            this.recordWriter = getRecordWriter(endPoint);
+            this.txnBatch = nextTxnBatch(recordWriter);
+            this.closed = false;
+            this.lastUsed = System.currentTimeMillis();
+        } catch (InterruptedException | RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new ConnectFailure(endPoint, e);
+        }
+    }
+
+    protected RecordWriter getRecordWriter(HiveEndPoint endPoint) throws StreamingException {
+        return new StrictJsonWriter(endPoint);
+    }
+
+    @Override
+    public String toString() {
+        return "{ "
+                + "endPoint = " + endPoint.toString()
+                + ", TransactionBatch = " + txnBatch.toString() + " }";
+    }
+
+    /**
+     * Write the record data to Hive
+     *
+     * @throws IOException if an error occurs during the write
+     * @throws InterruptedException if the write operation is interrupted
+     */
+    public synchronized void write(final byte[] record)
+            throws WriteFailure, SerializationError, InterruptedException {
+        if (closed) {
+            throw new IllegalStateException("This hive streaming writer was closed " +
+                    "and thus no longer able to write : " + endPoint);
+        }
+        // write the tuple
+        try {
+            LOG.debug("Writing event to {}", endPoint);
+            callWithTimeout(new CallRunner<Void>() {
+                @Override
+                public Void call() throws StreamingException, InterruptedException {
+                    txnBatch.write(record);
+                    totalRecords++;
+                    return null;
+                }
+            });
+        } catch (SerializationError se) {
+            throw new SerializationError(endPoint.toString() + " SerializationError", se);
+        } catch (StreamingException | TimeoutException e) {
+            throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e);
+        }
+    }
+
+    /**
+     * Commits the current Txn if totalRecordsPerTransaction > 0 .
+     * If 'rollToNext' is true, will switch to next Txn in batch or to a
+     *       new TxnBatch if current Txn batch is exhausted
+     */
+    public void flush(boolean rollToNext)
+            throws CommitFailure, TxnBatchFailure, TxnFailure, InterruptedException {
+        // if there are no records do not call flush
+        if (totalRecords <= 0) return;
+        try {
+            synchronized (txnBatchLock) {
+                commitTxn();
+                nextTxn(rollToNext);
+                totalRecords = 0;
+                lastUsed = System.currentTimeMillis();
+            }
+        } catch (StreamingException e) {
+            throw new TxnFailure(txnBatch, e);
+        }
+    }
+
+    /** Queues up a heartbeat request on the current and remaining txns using the
+     *  heartbeatThdPool and returns immediately
+     */
+    public void heartBeat() throws InterruptedException {
+        // 1) schedule the heartbeat on one thread in pool
+        synchronized (txnBatchLock) {
+            try {
+                callWithTimeout(new CallRunner<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        try {
+                            LOG.info("Sending heartbeat on batch " + txnBatch);
+                            txnBatch.heartbeat();
+                        } catch (StreamingException e) {
+                            LOG.warn("Heartbeat error on batch " + txnBatch, e);
+                        }
+                        return null;
+                    }
+                });
+            } catch (InterruptedException e) {
+                throw e;
+            } catch (Exception e) {
+                LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, e);
+                // Suppressing exceptions as we don't care for errors on heartbeats
+            }
+        }
+    }
+
+    /**
+     * Returns totalRecords written so far in a transaction
+     * @returns totalRecords
+     */
+    public int getTotalRecords() {
+        return totalRecords;
+    }
+
+    /**
+     * Flush and Close current transactionBatch.
+     */
+    public void flushAndClose() throws TxnBatchFailure, TxnFailure, CommitFailure,
+            IOException, InterruptedException {
+        flush(false);
+        close();
+    }
+
+    /**
+     * Close the Transaction Batch and connection
+     * @throws IOException if an error occurs during close
+     * @throws InterruptedException if the close operation is interrupted
+     */
+    public void close() throws IOException, InterruptedException {
+        closeTxnBatch();
+        closeConnection();
+        closed = true;
+    }
+
+    protected void closeConnection() throws InterruptedException {
+        LOG.info("Closing connection to end point : {}", endPoint);
+        try {
+            callWithTimeout(new CallRunner<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    connection.close(); // could block
+                    return null;
+                }
+            });
+        } catch (Exception e) {
+            LOG.warn("Error closing connection to EndPoint : " + endPoint, e);
+            // Suppressing exceptions as we don't care for errors on connection close
+        }
+    }
+
+    protected void commitTxn() throws CommitFailure, InterruptedException {
+        LOG.debug("Committing Txn id {} to {}", txnBatch.getCurrentTxnId(), endPoint);
+        try {
+            callWithTimeout(new CallRunner<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    txnBatch.commit(); // could block
+                    return null;
+                }
+            });
+        } catch (StreamingException | TimeoutException e) {
+            throw new CommitFailure(endPoint, txnBatch.getCurrentTxnId(), e);
+        }
+    }
+
+    protected StreamingConnection newConnection(final UserGroupInformation ugi)
+            throws InterruptedException, ConnectFailure {
+        try {
+            return callWithTimeout(() -> {
+                return endPoint.newConnection(autoCreatePartitions, null, ugi); // could block
+            });
+        } catch (StreamingException | TimeoutException e) {
+            throw new ConnectFailure(endPoint, e);
+        }
+    }
+
+    protected TransactionBatch nextTxnBatch(final RecordWriter recordWriter)
+            throws InterruptedException, TxnBatchFailure {
+        LOG.debug("Fetching new Txn Batch for {}", endPoint);
+        TransactionBatch batch = null;
+        try {
+            batch = callWithTimeout(() -> {
+                return connection.fetchTransactionBatch(txnsPerBatch, recordWriter); // could block
+            });
+            batch.beginNextTransaction();
+            LOG.debug("Acquired {}. Switching to first txn", batch);
+        } catch (TimeoutException | StreamingException e) {
+            throw new TxnBatchFailure(endPoint, e);
+        }
+        return batch;
+    }
+
+    protected void closeTxnBatch() throws InterruptedException {
+        try {
+            LOG.debug("Closing Txn Batch {}", txnBatch);
+            callWithTimeout(new CallRunner<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    if (txnBatch != null) {
+                        txnBatch.close(); // could block
+                    }
+                    return null;
+                }
+            });
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (Exception e) {
+            LOG.warn("Error closing txn batch " + txnBatch, e);
+        }
+    }
+
+    /**
+     * Aborts the current Txn and switches to next Txn.
+     * @throws StreamingException if could not get new Transaction Batch, or switch to next Txn
+     */
+    public void abort() throws StreamingException, TxnBatchFailure, InterruptedException {
+        synchronized (txnBatchLock) {
+            abortTxn();
+            nextTxn(true); // roll to next
+        }
+    }
+
+
+    /**
+     * Aborts current Txn in the txnBatch.
+     */
+    protected void abortTxn() throws InterruptedException {
+        LOG.info("Aborting Txn id {} on End Point {}", txnBatch.getCurrentTxnId(), endPoint);
+        try {
+            callWithTimeout(new CallRunner<Void>() {
+                @Override
+                public Void call() throws StreamingException, InterruptedException {
+                    txnBatch.abort(); // could block
+                    return null;
+                }
+            });
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (TimeoutException e) {
+            LOG.warn("Timeout while aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
+        } catch (Exception e) {
+            LOG.warn("Error aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
+            // Suppressing exceptions as we don't care for errors on abort
+        }
+    }
+
+
+    /**
+     * if there are remainingTransactions in current txnBatch, begins nextTransactions
+     * otherwise creates new txnBatch.
+     * @param rollToNext Whether to roll to the next transaction batch
+     */
+    protected void nextTxn(boolean rollToNext) throws StreamingException, InterruptedException, TxnBatchFailure {
+        if (txnBatch.remainingTransactions() == 0) {
+            closeTxnBatch();
+            txnBatch = null;
+            if (rollToNext) {
+                txnBatch = nextTxnBatch(recordWriter);
+            }
+        } else if (rollToNext) {
+            LOG.debug("Switching to next Txn for {}", endPoint);
+            txnBatch.beginNextTransaction(); // does not block
+        }
+    }
+
+    /**
+     * If the current thread has been interrupted, then throws an
+     * exception.
+     * @throws InterruptedException uf the current thread has been interrupted
+     */
+    protected static void checkAndThrowInterruptedException()
+            throws InterruptedException {
+        if (Thread.currentThread().interrupted()) {
+            throw new InterruptedException("Timed out before Hive call was made. "
+                    + "Your callTimeout might be set too low or Hive calls are "
+                    + "taking too long.");
+        }
+    }
+
+    /**
+     * Execute the callable on a separate thread and wait for the completion
+     * for the specified amount of time in milliseconds. In case of timeout
+     * cancel the callable and throw an IOException
+     */
+    private <T> T callWithTimeout(final CallRunner<T> callRunner)
+            throws TimeoutException, StreamingException, InterruptedException {
+        Future<T> future = callTimeoutPool.submit(callRunner::call);
+        try {
+            if (callTimeout > 0) {
+                return future.get(callTimeout, TimeUnit.MILLISECONDS);
+            } else {
+                return future.get();
+            }
+        } catch (TimeoutException eT) {
+            future.cancel(true);
+            throw eT;
+        } catch (ExecutionException e1) {
+            Throwable cause = e1.getCause();
+            if (cause instanceof IOException) {
+                throw new StreamingIOFailure("I/O Failure", (IOException) cause);
+            } else if (cause instanceof StreamingException) {
+                throw (StreamingException) cause;
+            } else if (cause instanceof InterruptedException) {
+                throw (InterruptedException) cause;
+            } else if (cause instanceof RuntimeException) {
+                throw (RuntimeException) cause;
+            } else if (cause instanceof TimeoutException) {
+                throw new StreamingException("Operation Timed Out.", (TimeoutException) cause);
+            } else {
+                throw new RuntimeException(e1);
+            }
+        }
+    }
+
+    public long getLastUsed() {
+        return lastUsed;
+    }
+
+    private byte[] generateRecord(List<String> tuple) {
+        StringBuilder buf = new StringBuilder();
+        for (String o : tuple) {
+            buf.append(o);
+            buf.append(",");
+        }
+        return buf.toString().getBytes();
+    }
+
+    /**
+     * Simple interface whose <tt>call</tt> method is called by
+     * {#callWithTimeout} in a new thread inside a
+     * {@linkplain java.security.PrivilegedExceptionAction#run()} call.
+     * @param <T> the type of object returned from the call
+     */
+    private interface CallRunner<T> {
+        T call() throws Exception;
+    }
+
+    public static class Failure extends Exception {
+        public Failure(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
+
+    public static class WriteFailure extends Failure {
+        public WriteFailure(HiveEndPoint endPoint, Long currentTxnId, Throwable cause) {
+            super("Failed writing to : " + endPoint + ". TxnID : " + currentTxnId, cause);
+        }
+    }
+
+    public static class CommitFailure extends Failure {
+        public CommitFailure(HiveEndPoint endPoint, Long txnID, Throwable cause) {
+            super("Commit of Txn " + txnID + " failed on EndPoint: " + endPoint, cause);
+        }
+    }
+
+    public static class ConnectFailure extends Failure {
+        public ConnectFailure(HiveEndPoint ep, Throwable cause) {
+            super("Failed connecting to EndPoint " + ep, cause);
+        }
+    }
+
+    public static class TxnBatchFailure extends Failure {
+        public TxnBatchFailure(HiveEndPoint ep, Throwable cause) {
+            super("Failed acquiring Transaction Batch from EndPoint: " + ep, cause);
+        }
+    }
+
+    public static class TxnFailure extends Failure {
+        public TxnFailure(TransactionBatch txnBatch, Throwable cause) {
+            super("Failed switching to next Txn in TxnBatch " + txnBatch, cause);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2019b93/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/ValidationResources.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/ValidationResources.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/ValidationResources.java
new file mode 100644
index 0000000..1014efb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/ValidationResources.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.hive;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A helper class for maintaining loaded configurations (to avoid reloading on use unless necessary)
+ */
+public class ValidationResources {
+
+    private final String configResources;
+    private final Configuration configuration;
+
+    public ValidationResources(String configResources, Configuration configuration) {
+        this.configResources = configResources;
+        this.configuration = configuration;
+    }
+
+    public String getConfigResources() {
+        return configResources;
+    }
+
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2019b93/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index cc25947..45e1f29 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -15,3 +15,4 @@
 org.apache.nifi.processors.hive.SelectHiveQL
 org.apache.nifi.processors.hive.PutHiveQL
 org.apache.nifi.processors.hive.ConvertAvroToORC
+org.apache.nifi.processors.hive.PutHiveStreaming


Mime
View raw message