nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jw...@apache.org
Subject [3/3] nifi git commit: NIFI-3950 Refactor AWS bundle
Date Sat, 21 Oct 2017 21:21:33 GMT
NIFI-3950 Refactor AWS bundle

Signed-off-by: James Wing <jvwing@gmail.com>

This closes #2140.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c894246e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c894246e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c894246e

Branch: refs/heads/master
Commit: c894246e036b7733a4ca3808aaea203b9a82812e
Parents: 2acf6bd
Author: Christopher Currie <christopher@currie.com>
Authored: Thu Oct 19 08:23:02 2017 -0700
Committer: James Wing <jvwing@gmail.com>
Committed: Sat Oct 21 12:58:40 2017 -0700

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |   5 +
 .../nifi-aws-abstract-processors/pom.xml        |  79 +++++
 ...AbstractAWSCredentialsProviderProcessor.java | 110 ++++++
 .../processors/aws/AbstractAWSProcessor.java    | 280 +++++++++++++++
 .../factory/CredentialPropertyDescriptors.java  | 181 ++++++++++
 .../aws/dynamodb/AbstractDynamoDBProcessor.java | 344 ++++++++++++++++++
 .../AbstractWriteDynamoDBProcessor.java         |  66 ++++
 .../nifi/processors/aws/dynamodb/ItemKeys.java  |  53 +++
 .../kinesis/AbstractBaseKinesisProcessor.java   |  98 ++++++
 .../AbstractKinesisFirehoseProcessor.java       |  81 +++++
 .../stream/AbstractKinesisStreamProcessor.java  |  65 ++++
 .../aws/lambda/AbstractAWSLambdaProcessor.java  |  72 ++++
 .../processors/aws/s3/AbstractS3Processor.java  | 295 ++++++++++++++++
 .../aws/sns/AbstractSNSProcessor.java           |  75 ++++
 .../aws/sqs/AbstractSQSProcessor.java           |  69 ++++
 .../nifi-aws-bundle/nifi-aws-nar/pom.xml        |   3 +-
 .../nifi-aws-bundle/nifi-aws-processors/pom.xml |  10 +
 ...AbstractAWSCredentialsProviderProcessor.java | 110 ------
 .../processors/aws/AbstractAWSProcessor.java    | 280 ---------------
 .../factory/CredentialPropertyDescriptors.java  | 181 ----------
 .../service/AWSCredentialsProviderService.java  |  44 ---
 .../aws/dynamodb/AbstractDynamoDBProcessor.java | 348 -------------------
 .../AbstractWriteDynamoDBProcessor.java         |  69 ----
 .../nifi/processors/aws/dynamodb/ItemKeys.java  |  53 ---
 .../kinesis/AbstractBaseKinesisProcessor.java   |  98 ------
 .../AbstractKinesisFirehoseProcessor.java       |  81 -----
 .../stream/AbstractKinesisStreamProcessor.java  |  65 ----
 .../aws/lambda/AbstractAWSLambdaProcessor.java  |  72 ----
 .../processors/aws/s3/AbstractS3Processor.java  | 295 ----------------
 .../aws/sns/AbstractSNSProcessor.java           |  75 ----
 .../aws/sqs/AbstractSQSProcessor.java           |  69 ----
 .../aws/cloudwatch/ITPutCloudWatchMetric.java   |   2 +-
 .../nifi-aws-service-api-nar/pom.xml            |  45 +++
 .../src/main/resources/META-INF/LICENSE         | 232 +++++++++++++
 .../src/main/resources/META-INF/NOTICE          |  65 ++++
 .../nifi-aws-service-api/pom.xml                |  38 ++
 .../service/AWSCredentialsProviderService.java  |  44 +++
 nifi-nar-bundles/nifi-aws-bundle/pom.xml        |  19 +
 pom.xml                                         |   6 +
 39 files changed, 2335 insertions(+), 1842 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c894246e/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 7710b3d..b9cd707 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -293,6 +293,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-aws-service-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-ambari-nar</artifactId>
             <type>nar</type>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/c894246e/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml
new file mode 100644
index 0000000..edc291e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>nifi-aws-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-aws-abstract-processors</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-dynamodb</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-kinesis</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-lambda</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-s3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-sns</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-sqs</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-aws-service-api</artifactId>
+            <version>1.5.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/c894246e/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
new file mode 100644
index 0000000..c64ed07
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+
+/**
+ * Base class for aws processors that uses AWSCredentialsProvider interface for creating aws clients.
+ *
+ * @param <ClientType> client type
+ *
+ * @see <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a>
+ */
+public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends AmazonWebServiceClient>
+    extends AbstractAWSProcessor<ClientType>  {
+
+    /**
+     * AWS credentials provider service
+     *
+     * @see  <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a>
+     */
+    public static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
+            .name("AWS Credentials Provider service")
+            .description("The Controller Service that is used to obtain aws credentials provider")
+            .required(false)
+            .identifiesControllerService(AWSCredentialsProviderService.class)
+            .build();
+
+    /**
+     * This method checks if {#link {@link #AWS_CREDENTIALS_PROVIDER_SERVICE} is available and if it
+     * is, uses the credentials provider, otherwise it invokes the {@link AbstractAWSProcessor#onScheduled(ProcessContext)}
+     * which uses static AWSCredentials for the aws processors
+     */
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        ControllerService service = context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService();
+        if (service != null) {
+            getLogger().debug("Using aws credentials provider service for creating client");
+            onScheduledUsingControllerService(context);
+        } else {
+            getLogger().debug("Using aws credentials for creating client");
+            super.onScheduled(context);
+        }
+    }
+
+    /**
+     * Create aws client using credentials provider
+     * @param context the process context
+     */
+    protected void onScheduledUsingControllerService(ProcessContext context) {
+        final ClientType awsClient = createClient(context, getCredentialsProvider(context), createConfiguration(context));
+        this.client = awsClient;
+        super.initializeRegionAndEndpoint(context);
+
+     }
+
+    @OnShutdown
+    public void onShutDown() {
+        if ( this.client != null ) {
+           this.client.shutdown();
+        }
+    }
+
+    /**
+     * Get credentials provider using the {@link AWSCredentialsProviderService}
+     * @param context the process context
+     * @return AWSCredentialsProvider the credential provider
+     * @see  <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a>
+     */
+    protected AWSCredentialsProvider getCredentialsProvider(final ProcessContext context) {
+
+        final AWSCredentialsProviderService awsCredentialsProviderService =
+              context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService(AWSCredentialsProviderService.class);
+
+        return awsCredentialsProviderService.getCredentialsProvider();
+
+    }
+
+    /**
+     * Abstract method to create aws client using credentials provider.  This is the preferred method
+     * for creating aws clients
+     * @param context process context
+     * @param credentialsProvider aws credentials provider
+     * @param config aws client configuration
+     * @return ClientType the client
+     */
+    protected abstract ClientType createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/c894246e/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
new file mode 100644
index 0000000..5d244c0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AnonymousAWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.PropertiesCredentials;
+import com.amazonaws.http.conn.ssl.SdkTLSSocketFactory;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.net.ssl.SSLContext;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors;
+import org.apache.nifi.ssl.SSLContextService;
+
+/**
+ * Abstract base class for aws processors.  This class uses aws credentials for creating aws clients
+ *
+ * @deprecated use {@link AbstractAWSCredentialsProviderProcessor} instead which uses credentials providers or creating aws clients
+ * @see AbstractAWSCredentialsProviderProcessor
+ *
+ */
+@Deprecated
+public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceClient> extends AbstractProcessor {
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("FlowFiles are routed to success relationship").build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+            .description("FlowFiles are routed to failure relationship").build();
+
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+    public static final PropertyDescriptor CREDENTIALS_FILE = CredentialPropertyDescriptors.CREDENTIALS_FILE;
+    public static final PropertyDescriptor ACCESS_KEY = CredentialPropertyDescriptors.ACCESS_KEY;
+    public static final PropertyDescriptor SECRET_KEY = CredentialPropertyDescriptors.SECRET_KEY;
+
+    public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
+            .name("Proxy Host")
+            .description("Proxy host name or IP")
+            .expressionLanguageSupported(true)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROXY_HOST_PORT = new PropertyDescriptor.Builder()
+            .name("Proxy Host Port")
+            .description("Proxy host port")
+            .expressionLanguageSupported(true)
+            .required(false)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
+            .name("Region")
+            .required(true)
+            .allowableValues(getAvailableRegions())
+            .defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
+            .build();
+
+    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Communications Timeout")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("30 secs")
+            .build();
+
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    public static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder()
+            .name("Endpoint Override URL")
+            .description("Endpoint URL to use instead of the AWS default including scheme, host, port, and path. " +
+                    "The AWS libraries select an endpoint URL based on the AWS region, but this property overrides " +
+                    "the selected endpoint URL, allowing use with other S3-compatible endpoints.")
+            .expressionLanguageSupported(true)
+            .required(false)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    protected volatile ClientType client;
+    protected volatile Region region;
+
+    // If protocol is changed to be a property, ensure other uses are also changed
+    protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS;
+    protected static final String DEFAULT_USER_AGENT = "NiFi";
+
+    private static AllowableValue createAllowableValue(final Regions regions) {
+        return new AllowableValue(regions.getName(), regions.getName(), regions.getName());
+    }
+
+    private static AllowableValue[] getAvailableRegions() {
+        final List<AllowableValue> values = new ArrayList<>();
+        for (final Regions regions : Regions.values()) {
+            values.add(createAllowableValue(regions));
+        }
+
+        return values.toArray(new AllowableValue[values.size()]);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
+
+        final boolean accessKeySet = validationContext.getProperty(ACCESS_KEY).isSet();
+        final boolean secretKeySet = validationContext.getProperty(SECRET_KEY).isSet();
+        if ((accessKeySet && !secretKeySet) || (secretKeySet && !accessKeySet)) {
+            problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("If setting Secret Key or Access Key, must set both").build());
+        }
+
+        final boolean credentialsFileSet = validationContext.getProperty(CREDENTIALS_FILE).isSet();
+        if ((secretKeySet || accessKeySet) && credentialsFileSet) {
+            problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("Cannot set both Credentials File and Secret Key/Access Key").build());
+        }
+
+        final boolean proxyHostSet = validationContext.getProperty(PROXY_HOST).isSet();
+        final boolean proxyHostPortSet = validationContext.getProperty(PROXY_HOST_PORT).isSet();
+        if ( ((!proxyHostSet) && proxyHostPortSet) || (proxyHostSet && (!proxyHostPortSet)) ) {
+            problems.add(new ValidationResult.Builder().input("Proxy Host Port").valid(false).explanation("Both proxy host and port must be set").build());
+        }
+
+        return problems;
+    }
+
+    protected ClientConfiguration createConfiguration(final ProcessContext context) {
+        final ClientConfiguration config = new ClientConfiguration();
+        config.setMaxConnections(context.getMaxConcurrentTasks());
+        config.setMaxErrorRetry(0);
+        config.setUserAgent(DEFAULT_USER_AGENT);
+        // If this is changed to be a property, ensure other uses are also changed
+        config.setProtocol(DEFAULT_PROTOCOL);
+        final int commsTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        config.setConnectionTimeout(commsTimeout);
+        config.setSocketTimeout(commsTimeout);
+
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
+            final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE);
+            // NIFI-3788: Changed hostnameVerifier from null to DHV (BrowserCompatibleHostnameVerifier is deprecated)
+            SdkTLSSocketFactory sdkTLSSocketFactory = new SdkTLSSocketFactory(sslContext, new DefaultHostnameVerifier());
+            config.getApacheHttpClientConfig().setSslSocketFactory(sdkTLSSocketFactory);
+        }
+
+        if (context.getProperty(PROXY_HOST).isSet()) {
+            String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
+            config.setProxyHost(proxyHost);
+            Integer proxyPort = context.getProperty(PROXY_HOST_PORT).evaluateAttributeExpressions().asInteger();
+            config.setProxyPort(proxyPort);
+        }
+
+        return config;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        final ClientType awsClient = createClient(context, getCredentials(context), createConfiguration(context));
+        this.client = awsClient;
+        initializeRegionAndEndpoint(context);
+    }
+
+    protected void initializeRegionAndEndpoint(ProcessContext context) {
+        // if the processor supports REGION, get the configured region.
+        if (getSupportedPropertyDescriptors().contains(REGION)) {
+            final String region = context.getProperty(REGION).getValue();
+            if (region != null) {
+                this.region = Region.getRegion(Regions.fromName(region));
+                client.setRegion(this.region);
+            } else {
+                this.region = null;
+            }
+        }
+
+        // if the endpoint override has been configured, set the endpoint.
+        // (per Amazon docs this should only be configured at client creation)
+        if (getSupportedPropertyDescriptors().contains(ENDPOINT_OVERRIDE)) {
+            final String urlstr = StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).evaluateAttributeExpressions().getValue());
+            if (!urlstr.isEmpty()) {
+                this.client.setEndpoint(urlstr);
+            }
+        }
+    }
+
+    /**
+     * Create client from the arguments
+     * @param context process context
+     * @param credentials static aws credentials
+     * @param config aws client configuration
+     * @return ClientType aws client
+     *
+     * @deprecated use {@link AbstractAWSCredentialsProviderProcessor#createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)}
+     */
+    @Deprecated
+    protected abstract ClientType createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config);
+
+    protected ClientType getClient() {
+        return client;
+    }
+
+    protected Region getRegion() {
+        return region;
+    }
+
+    protected AWSCredentials getCredentials(final ProcessContext context) {
+        final String accessKey = context.getProperty(ACCESS_KEY).evaluateAttributeExpressions().getValue();
+        final String secretKey = context.getProperty(SECRET_KEY).evaluateAttributeExpressions().getValue();
+
+        final String credentialsFile = context.getProperty(CREDENTIALS_FILE).getValue();
+
+        if (credentialsFile != null) {
+            try {
+                return new PropertiesCredentials(new File(credentialsFile));
+            } catch (final IOException ioe) {
+                throw new ProcessException("Could not read Credentials File", ioe);
+            }
+        }
+
+        if (accessKey != null && secretKey != null) {
+            return new BasicAWSCredentials(accessKey, secretKey);
+        }
+
+        return new AnonymousAWSCredentials();
+
+    }
+
+    @OnShutdown
+    public void onShutdown() {
+        if ( getClient() != null ) {
+            getClient().shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c894246e/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/factory/CredentialPropertyDescriptors.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/factory/CredentialPropertyDescriptors.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/factory/CredentialPropertyDescriptors.java
new file mode 100644
index 0000000..920ec32
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/factory/CredentialPropertyDescriptors.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.credentials.provider.factory;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.util.StandardValidators;
+
+/**
+ * Shared definitions of properties that specify various AWS credentials.
+ *
+ * @see <a href="http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html">
+ *     Providing AWS Credentials in the AWS SDK for Java</a>
+ */
+public class CredentialPropertyDescriptors {
+
+    /**
+     * Specifies use of the Default Credential Provider Chain
+     *
+     * @see <a href="http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html#id1">
+     *     AWS SDK: Default Credential Provider Chain
+     *     </a>
+     */
+    public static final PropertyDescriptor USE_DEFAULT_CREDENTIALS = new PropertyDescriptor.Builder()
+            .name("default-credentials")
+            .displayName("Use Default Credentials")
+            .expressionLanguageSupported(false)
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .sensitive(false)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .description("If true, uses the Default Credential chain, including EC2 instance profiles or roles, " +
+                "environment variables, default user credentials, etc.")
+            .build();
+
+    public static final PropertyDescriptor CREDENTIALS_FILE = new PropertyDescriptor.Builder()
+            .name("Credentials File")
+            .displayName("Credentials File")
+            .expressionLanguageSupported(false)
+            .required(false)
+            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+            .description("Path to a file containing AWS access key and secret key in properties file format.")
+            .build();
+
+    public static final PropertyDescriptor ACCESS_KEY = new PropertyDescriptor.Builder()
+            .name("Access Key")
+            .displayName("Access Key")
+            .expressionLanguageSupported(true)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor SECRET_KEY = new PropertyDescriptor.Builder()
+            .name("Secret Key")
+            .displayName("Secret Key")
+            .expressionLanguageSupported(true)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    /**
+     * Specifies use of a named profile credential.
+     *
+     * @see <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/profile/ProfileCredentialsProvider.html">
+     *     ProfileCredentialsProvider</a>
+     */
+    public static final PropertyDescriptor PROFILE_NAME = new PropertyDescriptor.Builder()
+            .name("profile-name")
+            .displayName("Profile Name")
+            .expressionLanguageSupported(true)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(false)
+            .description("The AWS profile name for credentials from the profile configuration file.")
+            .build();
+
+    public static final PropertyDescriptor USE_ANONYMOUS_CREDENTIALS = new PropertyDescriptor.Builder()
+            .name("anonymous-credentials")
+            .displayName("Use Anonymous Credentials")
+            .expressionLanguageSupported(false)
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .sensitive(false)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .description("If true, uses Anonymous credentials")
+            .build();
+
+    /**
+     * AWS Role Arn used for cross account access
+     *
+     * @see <a href="http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html#genref-arns">AWS ARN</a>
+     */
+    public static final PropertyDescriptor ASSUME_ROLE_ARN = new PropertyDescriptor.Builder()
+            .name("Assume Role ARN")
+            .displayName("Assume Role ARN")
+            .expressionLanguageSupported(false)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(false)
+            .description("The AWS Role ARN for cross account access. This is used in conjunction with role name and session timeout")
+            .build();
+
+    /**
+     * The role name while creating aws role
+     */
+    public static final PropertyDescriptor ASSUME_ROLE_NAME = new PropertyDescriptor.Builder()
+            .name("Assume Role Session Name")
+            .displayName("Assume Role Session Name")
+            .expressionLanguageSupported(false)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(false)
+            .description("The AWS Role Name for cross account access. This is used in conjunction with role ARN and session time out")
+            .build();
+
+    /**
+     * Max session time for role based credentials. The range is between 900 and 3600 seconds.
+     */
+    public static final PropertyDescriptor MAX_SESSION_TIME = new PropertyDescriptor.Builder()
+            .name("Session Time")
+            .description("Session time for role based session (between 900 and 3600 seconds). This is used in conjunction with role ARN and name")
+            .defaultValue("3600")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .sensitive(false)
+            .build();
+
+    /**
+     * The ExternalId used while creating aws role.
+     */
+    public static final PropertyDescriptor ASSUME_ROLE_EXTERNAL_ID = new PropertyDescriptor.Builder()
+            .name("assume-role-external-id")
+            .displayName("Assume Role External ID")
+            .expressionLanguageSupported(false)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(false)
+            .description("External ID for cross-account access. This is used in conjunction with role arn, " +
+                "role name, and optional session time out")
+            .build();
+
+    /**
+     * Assume Role Proxy variables for configuring proxy to retrieve keys
+     */
+    public static final PropertyDescriptor ASSUME_ROLE_PROXY_HOST = new PropertyDescriptor.Builder()
+            .name("assume-role-proxy-host")
+            .displayName("Assume Role Proxy Host")
+            .expressionLanguageSupported(false)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(false)
+            .description("Proxy host for cross-account access, if needed within your environment. This will configure a proxy to request for temporary access keys into another AWS account")
+            .build();
+
+    public static final PropertyDescriptor ASSUME_ROLE_PROXY_PORT = new PropertyDescriptor.Builder()
+            .name("assume-role-proxy-port")
+            .displayName("Assume Role Proxy Port")
+            .expressionLanguageSupported(false)
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .sensitive(false)
+            .description("Proxy pot for cross-account access, if needed within your environment. This will configure a proxy to request for temporary access keys into another AWS account")
+            .build();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c894246e/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
new file mode 100644
index 0000000..aed957f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import java.math.BigDecimal;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+
+/**
+ * Base class for NiFi dynamo db related processors
+ */
+public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsProviderProcessor<AmazonDynamoDBClient> {
+
+    public static final Relationship REL_UNPROCESSED = new Relationship.Builder().name("unprocessed")
+            .description("FlowFiles are routed to unprocessed relationship when DynamoDB is not able to process "
+               + "all the items in the request. Typical reasons are insufficient table throughput capacity and exceeding the maximum bytes per request. "
+               + "Unprocessed FlowFiles can be retried with a new request.").build();
+
+    public static final AllowableValue ALLOWABLE_VALUE_STRING = new AllowableValue("string");
+    public static final AllowableValue ALLOWABLE_VALUE_NUMBER = new AllowableValue("number");
+
+    public static final String DYNAMODB_KEY_ERROR_UNPROCESSED = "dynamodb.key.error.unprocessed";
+    public static final String DYNAMODB_RANGE_KEY_VALUE_ERROR = "dynmodb.range.key.value.error";
+    public static final String DYNAMODB_HASH_KEY_VALUE_ERROR = "dynmodb.hash.key.value.error";
+    public static final String DYNAMODB_KEY_ERROR_NOT_FOUND = "dynamodb.key.error.not.found";
+    public static final String DYNAMODB_ERROR_EXCEPTION_MESSAGE = "dynamodb.error.exception.message";
+    public static final String DYNAMODB_ERROR_CODE = "dynamodb.error.code";
+    public static final String DYNAMODB_ERROR_MESSAGE = "dynamodb.error.message";
+    public static final String DYNAMODB_ERROR_TYPE = "dynamodb.error.type";
+    public static final String DYNAMODB_ERROR_SERVICE = "dynamodb.error.service";
+    public static final String DYNAMODB_ERROR_RETRYABLE = "dynamodb.error.retryable";
+    public static final String DYNAMODB_ERROR_REQUEST_ID = "dynamodb.error.request.id";
+    public static final String DYNAMODB_ERROR_STATUS_CODE = "dynamodb.error.status.code";
+    public static final String DYNAMODB_ITEM_HASH_KEY_VALUE = "  dynamodb.item.hash.key.value";
+    public static final String DYNAMODB_ITEM_RANGE_KEY_VALUE = "  dynamodb.item.range.key.value";
+    public static final String DYNAMODB_ITEM_IO_ERROR = "dynamodb.item.io.error";
+    public static final String AWS_DYNAMO_DB_ITEM_SIZE_ERROR = "dynamodb.item.size.error";
+
+    protected static final String DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE = "DynamoDB key not found : ";
+
+    public static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder()
+            .name("Table Name")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The DynamoDB table name")
+            .build();
+
+    public static final PropertyDescriptor HASH_KEY_VALUE = new PropertyDescriptor.Builder()
+            .name("Hash Key Value")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The hash key value of the item")
+            .defaultValue("${dynamodb.item.hash.key.value}")
+            .build();
+
+    public static final PropertyDescriptor RANGE_KEY_VALUE = new PropertyDescriptor.Builder()
+            .name("Range Key Value")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .defaultValue("${dynamodb.item.range.key.value}")
+            .build();
+
+    public static final PropertyDescriptor HASH_KEY_VALUE_TYPE = new PropertyDescriptor.Builder()
+            .name("Hash Key Value Type")
+            .required(true)
+            .description("The hash key value type of the item")
+            .defaultValue(ALLOWABLE_VALUE_STRING.getValue())
+            .allowableValues(ALLOWABLE_VALUE_STRING, ALLOWABLE_VALUE_NUMBER)
+            .build();
+
+    public static final PropertyDescriptor RANGE_KEY_VALUE_TYPE = new PropertyDescriptor.Builder()
+            .name("Range Key Value Type")
+            .required(true)
+            .description("The range key value type of the item")
+            .defaultValue(ALLOWABLE_VALUE_STRING.getValue())
+            .allowableValues(ALLOWABLE_VALUE_STRING, ALLOWABLE_VALUE_NUMBER)
+            .build();
+
+    public static final PropertyDescriptor HASH_KEY_NAME = new PropertyDescriptor.Builder()
+            .name("Hash Key Name")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The hash key name of the item")
+            .build();
+
+    public static final PropertyDescriptor RANGE_KEY_NAME = new PropertyDescriptor.Builder()
+            .name("Range Key Name")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The range key name of the item")
+            .build();
+
+    public static final PropertyDescriptor JSON_DOCUMENT = new PropertyDescriptor.Builder()
+            .name("Json Document attribute")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The Json document to be retrieved from the dynamodb item")
+            .build();
+
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("Batch items for each request (between 1 and 50)")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.createLongValidator(1, 50, true))
+            .defaultValue("1")
+            .description("The items to be retrieved in one batch")
+            .build();
+
+    public static final PropertyDescriptor DOCUMENT_CHARSET = new PropertyDescriptor.Builder()
+            .name("Character set of document")
+            .description("Character set of data in the document")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .required(true)
+            .expressionLanguageSupported(true)
+            .defaultValue(Charset.defaultCharset().name())
+            .build();
+
+    protected volatile DynamoDB dynamoDB;
+
+    public static final Set<Relationship> dynamoDBrelationships = Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_UNPROCESSED)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return dynamoDBrelationships;
+    }
+
+    /**
+     * Create client using credentials provider. This is the preferred way for creating clients
+     */
+    @Override
+    protected AmazonDynamoDBClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
+        getLogger().debug("Creating client with credentials provider");
+
+        final AmazonDynamoDBClient client = new AmazonDynamoDBClient(credentialsProvider, config);
+
+        return client;
+    }
+
+    /**
+     * Create client using AWSCredentials
+     *
+     * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
+     */
+    @Override
+    protected AmazonDynamoDBClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
+        getLogger().debug("Creating client with aws credentials");
+
+        final AmazonDynamoDBClient client = new AmazonDynamoDBClient(credentials, config);
+
+        return client;
+    }
+
+    protected Object getValue(ProcessContext context, PropertyDescriptor type, PropertyDescriptor value, FlowFile flowFile) {
+        if ( context.getProperty(type).getValue().equals(ALLOWABLE_VALUE_STRING.getValue())) {
+            return context.getProperty(value).evaluateAttributeExpressions(flowFile).getValue();
+        } else {
+            return new BigDecimal(context.getProperty(value).evaluateAttributeExpressions(flowFile).getValue());
+        }
+    }
+
+    protected Object getAttributeValue(ProcessContext context, PropertyDescriptor propertyType, AttributeValue value) {
+        if ( context.getProperty(propertyType).getValue().equals(ALLOWABLE_VALUE_STRING.getValue())) {
+            if ( value == null ) return null;
+            else return value.getS();
+        } else {
+            if ( value == null ) return null;
+            else return new BigDecimal(value.getN());
+        }
+    }
+
+    protected synchronized DynamoDB getDynamoDB() {
+        if ( dynamoDB == null )
+            dynamoDB = new DynamoDB(client);
+        return dynamoDB;
+    }
+
+    protected Object getValue(Map<String, AttributeValue> item, String keyName, String valueType) {
+        if ( ALLOWABLE_VALUE_STRING.getValue().equals(valueType)) {
+            AttributeValue val = item.get(keyName);
+            if ( val == null ) return val;
+            else return val.getS();
+        } else {
+            AttributeValue val = item.get(keyName);
+            if ( val == null ) return val;
+            else return val.getN();
+        }
+    }
+
+    protected List<FlowFile> processException(final ProcessSession session, List<FlowFile> flowFiles, Exception exception) {
+        List<FlowFile> failedFlowFiles = new ArrayList<>();
+        for (FlowFile flowFile : flowFiles) {
+            flowFile = session.putAttribute(flowFile, DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() );
+            failedFlowFiles.add(flowFile);
+        }
+        return failedFlowFiles;
+    }
+
+    protected List<FlowFile> processClientException(final ProcessSession session, List<FlowFile> flowFiles,
+            AmazonClientException exception) {
+        List<FlowFile> failedFlowFiles = new ArrayList<>();
+        for (FlowFile flowFile : flowFiles) {
+            Map<String,String> attributes = new HashMap<>();
+            attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() );
+            attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.isRetryable()));
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            failedFlowFiles.add(flowFile);
+        }
+        return failedFlowFiles;
+    }
+
+    protected List<FlowFile> processServiceException(final ProcessSession session, List<FlowFile> flowFiles,
+            AmazonServiceException exception) {
+        List<FlowFile> failedFlowFiles = new ArrayList<>();
+        for (FlowFile flowFile : flowFiles) {
+            Map<String,String> attributes = new HashMap<>();
+            attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() );
+            attributes.put(DYNAMODB_ERROR_CODE, exception.getErrorCode() );
+            attributes.put(DYNAMODB_ERROR_MESSAGE, exception.getErrorMessage() );
+            attributes.put(DYNAMODB_ERROR_TYPE, exception.getErrorType().name() );
+            attributes.put(DYNAMODB_ERROR_SERVICE, exception.getServiceName() );
+            attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.isRetryable()));
+            attributes.put(DYNAMODB_ERROR_REQUEST_ID, exception.getRequestId() );
+            attributes.put(DYNAMODB_ERROR_STATUS_CODE, Integer.toString(exception.getStatusCode()) );
+            attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() );
+            attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.isRetryable()));
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            failedFlowFiles.add(flowFile);
+        }
+        return failedFlowFiles;
+    }
+
+    /**
+     * Send unhandled items to failure and remove the flow files from key to flow file map
+     * @param session used for sending the flow file
+     * @param keysToFlowFileMap - ItemKeys to flow file map
+     * @param hashKeyValue the items hash key value
+     * @param rangeKeyValue the items hash key value
+     */
+    protected void sendUnprocessedToUnprocessedRelationship(final ProcessSession session, Map<ItemKeys, FlowFile> keysToFlowFileMap, Object hashKeyValue, Object rangeKeyValue) {
+        ItemKeys itemKeys = new ItemKeys(hashKeyValue, rangeKeyValue);
+
+        FlowFile flowFile = keysToFlowFileMap.get(itemKeys);
+        flowFile = session.putAttribute(flowFile, DYNAMODB_KEY_ERROR_UNPROCESSED, itemKeys.toString());
+        session.transfer(flowFile,REL_UNPROCESSED);
+
+        getLogger().error("Unprocessed key " + itemKeys + " for flow file " + flowFile);
+
+        keysToFlowFileMap.remove(itemKeys);
+    }
+
+    protected boolean isRangeKeyValueConsistent(String rangeKeyName, Object rangeKeyValue, ProcessSession session,
+            FlowFile flowFile) {
+        boolean isRangeNameBlank = StringUtils.isBlank(rangeKeyName);
+        boolean isRangeValueNull = rangeKeyValue == null;
+        boolean isConsistent = true;
+        if ( ! isRangeNameBlank && (isRangeValueNull || StringUtils.isBlank(rangeKeyValue.toString()))) {
+            isConsistent = false;
+        }
+        if ( isRangeNameBlank &&  ( ! isRangeValueNull && ! StringUtils.isBlank(rangeKeyValue.toString()))) {
+            isConsistent = false;
+        }
+
+        if ( ! isConsistent ) {
+            getLogger().error("Range key name '" + rangeKeyName + "' was not consistent with range value "
+                + rangeKeyValue + "'" + flowFile);
+            flowFile = session.putAttribute(flowFile, DYNAMODB_RANGE_KEY_VALUE_ERROR, "range key '" + rangeKeyName
+                 + "'/value '" + rangeKeyValue + "' inconsistency error");
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        return isConsistent;
+
+    }
+
+    protected boolean isHashKeyValueConsistent(String hashKeyName, Object hashKeyValue, ProcessSession session,
+            FlowFile flowFile) {
+
+        boolean isConsistent = true;
+
+        if ( hashKeyValue == null || StringUtils.isBlank(hashKeyValue.toString())) {
+            getLogger().error("Hash key value '" + hashKeyValue + "' is required for flow file " + flowFile);
+                 flowFile = session.putAttribute(flowFile, DYNAMODB_HASH_KEY_VALUE_ERROR, "hash key " + hashKeyName
+                     + "/value '" + hashKeyValue + "' inconsistency error");
+            session.transfer(flowFile, REL_FAILURE);
+            isConsistent = false;
+        }
+
+        return isConsistent;
+
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.dynamoDB = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c894246e/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractWriteDynamoDBProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractWriteDynamoDBProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractWriteDynamoDBProcessor.java
new file mode 100644
index 0000000..fc88037
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractWriteDynamoDBProcessor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+
+public abstract class AbstractWriteDynamoDBProcessor extends AbstractDynamoDBProcessor {
+
+    /**
+     * Helper method to handle unprocessed items items
+     * @param session process session
+     * @param keysToFlowFileMap map of flow db primary key to flow file
+     * @param table dynamodb table
+     * @param hashKeyName the hash key name
+     * @param hashKeyValueType the hash key value
+     * @param rangeKeyName the range key name
+     * @param rangeKeyValueType range key value
+     * @param outcome the write outcome
+     */
+    protected void handleUnprocessedItems(final ProcessSession session, Map<ItemKeys, FlowFile> keysToFlowFileMap, final String table, final String hashKeyName, final String hashKeyValueType,
+            final String rangeKeyName, final String rangeKeyValueType, BatchWriteItemOutcome outcome) {
+        BatchWriteItemResult result = outcome.getBatchWriteItemResult();
+
+        // Handle unprocessed items
+        List<WriteRequest> unprocessedItems = result.getUnprocessedItems().get(table);
+        if ( unprocessedItems != null && unprocessedItems.size() > 0 ) {
+            for ( WriteRequest request : unprocessedItems) {
+                Map<String,AttributeValue> item = getRequestItem(request);
+                Object hashKeyValue = getValue(item, hashKeyName, hashKeyValueType);
+                Object rangeKeyValue = getValue(item, rangeKeyName, rangeKeyValueType);
+
+                sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
+            }
+        }
+    }
+
+    /**
+     * Get the request item key and attribute value
+     * @param writeRequest write request
+     * @return Map of keys and values
+     */
+    protected abstract Map<String, AttributeValue> getRequestItem(WriteRequest writeRequest);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c894246e/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/ItemKeys.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/ItemKeys.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/ItemKeys.java
new file mode 100644
index 0000000..9e53457
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/ItemKeys.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+/**
+ * Utility class to keep a map of keys and flow files
+ */
+class ItemKeys {
+
+    protected Object hashKey = "";
+    protected Object rangeKey = "";
+
+    public ItemKeys(Object hashKey, Object rangeKey) {
+        if ( hashKey != null )
+            this.hashKey = hashKey;
+        if ( rangeKey != null )
+            this.rangeKey = rangeKey;
+    }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this,ToStringStyle.SHORT_PREFIX_STYLE);
+    }
+
+    @Override
+    public int hashCode() {
+        return HashCodeBuilder.reflectionHashCode(this, false);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        return EqualsBuilder.reflectionEquals(this, other, false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c894246e/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java
new file mode 100644
index 0000000..e559820
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+import com.amazonaws.AmazonWebServiceClient;
+
+/**
+ * This class provides processor the base class for kinesis client
+ */
+public abstract class AbstractBaseKinesisProcessor<ClientType extends AmazonWebServiceClient>
+    extends AbstractAWSCredentialsProviderProcessor<ClientType> {
+
+    /**
+     * Kinesis put record response error message
+     */
+    public static final String AWS_KINESIS_ERROR_MESSAGE = "aws.kinesis.error.message";
+
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .displayName("Message Batch Size")
+            .name("message-batch-size")
+            .description("Batch size for messages (1-500).")
+            .defaultValue("250")
+            .required(false)
+            .addValidator(StandardValidators.createLongValidator(1, 500, true))
+            .sensitive(false)
+            .build();
+
+    public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new PropertyDescriptor.Builder()
+            .name("max-message-buffer-size")
+            .displayName("Max message buffer size (MB)")
+            .description("Max message buffer size in Mega-bytes")
+            .defaultValue("1 MB")
+            .required(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .sensitive(false)
+            .build();
+
+    /**
+     * Max buffer size 1 MB
+     */
+    public static final int MAX_MESSAGE_SIZE = 1000 * 1024;
+
+    protected FlowFile handleFlowFileTooBig(final ProcessSession session, FlowFile flowFileCandidate,
+            final String streamName, String message) {
+        flowFileCandidate = session.putAttribute(flowFileCandidate, message,
+            "record too big " + flowFileCandidate.getSize() + " max allowed " + MAX_MESSAGE_SIZE );
+        session.transfer(flowFileCandidate, REL_FAILURE);
+        getLogger().error("Failed to publish to kinesis {} records {} because the size was greater than {} bytes",
+            new Object[]{streamName, flowFileCandidate, MAX_MESSAGE_SIZE});
+        return flowFileCandidate;
+    }
+
+    protected List<FlowFile> filterMessagesByMaxSize(final ProcessSession session, final int batchSize, final long maxBufferSizeBytes, final String streamName, String message) {
+        List<FlowFile> flowFiles = new ArrayList<FlowFile>(batchSize);
+
+        long currentBufferSizeBytes = 0;
+
+        for (int i = 0; (i < batchSize) && (currentBufferSizeBytes <= maxBufferSizeBytes); i++) {
+
+            FlowFile flowFileCandidate = session.get();
+            if ( flowFileCandidate == null )
+                break;
+
+            if (flowFileCandidate.getSize() > MAX_MESSAGE_SIZE) {
+                flowFileCandidate = handleFlowFileTooBig(session, flowFileCandidate, streamName, message);
+                continue;
+            }
+
+            currentBufferSizeBytes += flowFileCandidate.getSize();
+
+            flowFiles.add(flowFileCandidate);
+        }
+        return flowFiles;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/c894246e/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java
new file mode 100644
index 0000000..ca15653
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.kinesis.AbstractBaseKinesisProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+
+/**
+ * This class provides processor the base class for kinesis firehose
+ */
+public abstract class AbstractKinesisFirehoseProcessor extends AbstractBaseKinesisProcessor<AmazonKinesisFirehoseClient> {
+
+    public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
+            .name("Amazon Kinesis Firehose Delivery Stream Name")
+            .description("The name of kinesis firehose delivery stream")
+            .expressionLanguageSupported(false)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("Batch Size")
+            .description("Batch size for messages (1-500).")
+            .defaultValue("250")
+            .required(false)
+            .addValidator(StandardValidators.createLongValidator(1, 500, true))
+            .sensitive(false)
+            .build();
+
+    public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new PropertyDescriptor.Builder()
+            .name("Max message buffer size")
+            .description("Max message buffer")
+            .defaultValue("1 MB")
+            .required(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .sensitive(false)
+            .build();
+
+    /**
+     * Create client using aws credentials provider. This is the preferred way for creating clients
+     */
+    @Override
+    protected AmazonKinesisFirehoseClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
+        getLogger().info("Creating client using aws credentials provider");
+
+        return new AmazonKinesisFirehoseClient(credentialsProvider, config);
+    }
+
+    /**
+     * Create client using AWSCredentails
+     *
+     * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
+     */
+    @Override
+    protected AmazonKinesisFirehoseClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
+        getLogger().info("Creating client using aws credentials");
+
+        return new AmazonKinesisFirehoseClient(credentials, config);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/c894246e/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java
new file mode 100644
index 0000000..b7513af
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.kinesis.AbstractBaseKinesisProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+
+/**
+ * This class provides processor the base class for kinesis client
+ */
+public abstract class AbstractKinesisStreamProcessor extends AbstractBaseKinesisProcessor<AmazonKinesisClient> {
+
+    public static final PropertyDescriptor KINESIS_STREAM_NAME = new PropertyDescriptor.Builder()
+            .name("kinesis-stream-name")
+            .displayName("Amazon Kinesis Stream Name")
+            .description("The name of Kinesis Stream")
+            .expressionLanguageSupported(false)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    /**
+     * Create client using aws credentials provider. This is the preferred way for creating clients
+     */
+    @Override
+    protected AmazonKinesisClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
+        getLogger().info("Creating client using aws credentials provider");
+
+        return new AmazonKinesisClient(credentialsProvider, config);
+    }
+
+    /**
+     * Create client using AWSCredentails
+     *
+     * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
+     */
+    @Override
+    protected AmazonKinesisClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
+        getLogger().info("Creating client using aws credentials");
+
+        return new AmazonKinesisClient(credentials, config);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/c894246e/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/lambda/AbstractAWSLambdaProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/lambda/AbstractAWSLambdaProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/lambda/AbstractAWSLambdaProcessor.java
new file mode 100644
index 0000000..758191d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/lambda/AbstractAWSLambdaProcessor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.lambda;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.lambda.AWSLambdaClient;
+
+/**
+ * This class is the base class for invoking aws lambda function
+ */
+public abstract class AbstractAWSLambdaProcessor extends AbstractAWSCredentialsProviderProcessor<AWSLambdaClient> {
+
+    public static final PropertyDescriptor AWS_LAMBDA_FUNCTION_NAME = new PropertyDescriptor.Builder()
+            .name("Amazon Lambda Name")
+            .description("The Lambda Function Name")
+            .expressionLanguageSupported(false)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor AWS_LAMBDA_FUNCTION_QUALIFIER = new PropertyDescriptor.Builder()
+            .name("Amazon Lambda Qualifier (version)")
+            .description("The Lambda Function Version")
+            .defaultValue("$LATEST")
+            .expressionLanguageSupported(false)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    /**
+     * Create client using aws credentials provider. This is the preferred way for creating clients
+     */
+    @Override
+    protected AWSLambdaClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
+        getLogger().info("Creating client using aws credentials provider");
+
+        return new AWSLambdaClient(credentialsProvider, config);
+    }
+
+    /**
+     * Create client using AWSCredentials
+     *
+     * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
+     */
+    @Override
+    protected AWSLambdaClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
+        getLogger().info("Creating client using aws credentials");
+
+        return new AWSLambdaClient(credentials, config);
+    }
+}
\ No newline at end of file


Mime
View raw message