nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject [2/2] nifi git commit: NIFI-1516 Provide AWS DynamoDB Delete/Put/Get processors
Date Fri, 08 Apr 2016 15:23:23 GMT
NIFI-1516 Provide AWS DynamoDB Delete/Put/Get processors

This closes #224.

Signed-off-by: Aldrin Piri <aldrin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e3155a8a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e3155a8a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e3155a8a

Branch: refs/heads/master
Commit: e3155a8a4ba49bb6c4f324ebf090a92d6dd97389
Parents: 41583e6
Author: mans2singh <mans2singh@yahoo.com>
Authored: Fri Apr 8 11:21:39 2016 -0400
Committer: Aldrin Piri <aldrin@apache.org>
Committed: Fri Apr 8 11:23:04 2016 -0400

----------------------------------------------------------------------
 .../processors/aws/AbstractAWSProcessor.java    |   7 +
 .../aws/dynamodb/AbstractDynamoDBProcessor.java | 338 ++++++++++++
 .../AbstractWriteDynamoDBProcessor.java         |  69 +++
 .../processors/aws/dynamodb/DeleteDynamoDB.java | 161 ++++++
 .../processors/aws/dynamodb/GetDynamoDB.java    | 197 +++++++
 .../nifi/processors/aws/dynamodb/ItemKeys.java  |  53 ++
 .../processors/aws/dynamodb/PutDynamoDB.java    | 197 +++++++
 .../org.apache.nifi.processor.Processor         |   5 +-
 .../aws/dynamodb/DeleteDynamoDBTest.java        | 368 +++++++++++++
 .../aws/dynamodb/GetDynamoDBTest.java           | 530 +++++++++++++++++++
 .../aws/dynamodb/ITAbstractDynamoDBTest.java    | 139 +++++
 .../dynamodb/ITPutGetDeleteGetDynamoDBTest.java | 428 +++++++++++++++
 .../processors/aws/dynamodb/ItemKeysTest.java   |  68 +++
 .../aws/dynamodb/PutDynamoDBTest.java           | 460 ++++++++++++++++
 14 files changed, 3019 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
index a2eb9df..e2c2196 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -31,6 +31,7 @@ import javax.net.ssl.SSLContext;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -287,4 +288,10 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
 
     }
 
+    @OnShutdown
+    public void onShutdown() {
+        if ( getClient() != null ) {
+            getClient().shutdown();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
new file mode 100644
index 0000000..df1a9ff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import java.math.BigDecimal;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+
+/**
+ * Base class for Nifi dynamo db related processors
+ *
+ * @see DeleteDynamoDB
+ * @see PutDynamoDB
+ * @see GetDynamoDB
+ */
+public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsProviderProcessor<AmazonDynamoDBClient> {
+
+    public static final Relationship REL_UNPROCESSED = new Relationship.Builder().name("unprocessed")
+            .description("FlowFiles are routed to unprocessed relationship when DynamoDB is not able to process "
+               + "all the items in the request. Typical reasons are insufficient table throughput capacity and exceeding the maximum bytes per request. "
+               + "Unprocessed FlowFiles can be retried with a new request.").build();
+
+    public static final AllowableValue ALLOWABLE_VALUE_STRING = new AllowableValue("string");
+    public static final AllowableValue ALLOWABLE_VALUE_NUMBER = new AllowableValue("number");
+
+    public static final String DYNAMODB_KEY_ERROR_UNPROCESSED = "dynamodb.key.error.unprocessed";
+    public static final String DYNAMODB_RANGE_KEY_VALUE_ERROR = "dynmodb.range.key.value.error";
+    public static final String DYNAMODB_HASH_KEY_VALUE_ERROR = "dynmodb.hash.key.value.error";
+    public static final String DYNAMODB_KEY_ERROR_NOT_FOUND = "dynamodb.key.error.not.found";
+    public static final String DYNAMODB_ERROR_EXCEPTION_MESSAGE = "dynamodb.error.exception.message";
+    public static final String DYNAMODB_ERROR_CODE = "dynamodb.error.code";
+    public static final String DYNAMODB_ERROR_MESSAGE = "dynamodb.error.message";
+    public static final String DYNAMODB_ERROR_TYPE = "dynamodb.error.type";
+    public static final String DYNAMODB_ERROR_SERVICE = "dynamodb.error.service";
+    public static final String DYNAMODB_ERROR_RETRYABLE = "dynamodb.error.retryable";
+    public static final String DYNAMODB_ERROR_REQUEST_ID = "dynamodb.error.request.id";
+    public static final String DYNAMODB_ERROR_STATUS_CODE = "dynamodb.error.status.code";
+    public static final String DYNAMODB_ITEM_HASH_KEY_VALUE = "  dynamodb.item.hash.key.value";
+    public static final String DYNAMODB_ITEM_RANGE_KEY_VALUE = "  dynamodb.item.range.key.value";
+    public static final String DYNAMODB_ITEM_IO_ERROR = "dynamodb.item.io.error";
+    public static final String AWS_DYNAMO_DB_ITEM_SIZE_ERROR = "dynamodb.item.size.error";
+
+    protected static final String DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE = "DynamoDB key not found : ";
+
+    public static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder()
+            .name("Table Name")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The DynamoDB table name")
+            .build();
+
+    public static final PropertyDescriptor HASH_KEY_VALUE = new PropertyDescriptor.Builder()
+            .name("Hash Key Value")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The hash key value of the item")
+            .defaultValue("${dynamodb.item.hash.key.value}")
+            .build();
+
+    public static final PropertyDescriptor RANGE_KEY_VALUE = new PropertyDescriptor.Builder()
+            .name("Range Key Value")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .defaultValue("${dynamodb.item.range.key.value}")
+            .build();
+
+    public static final PropertyDescriptor HASH_KEY_VALUE_TYPE = new PropertyDescriptor.Builder()
+            .name("Hash Key Value Type")
+            .required(true)
+            .description("The hash key value type of the item")
+            .defaultValue(ALLOWABLE_VALUE_STRING.getValue())
+            .allowableValues(ALLOWABLE_VALUE_STRING, ALLOWABLE_VALUE_NUMBER)
+            .build();
+
+    public static final PropertyDescriptor RANGE_KEY_VALUE_TYPE = new PropertyDescriptor.Builder()
+            .name("Range Key Value Type")
+            .required(true)
+            .description("The range key value type of the item")
+            .defaultValue(ALLOWABLE_VALUE_STRING.getValue())
+            .allowableValues(ALLOWABLE_VALUE_STRING, ALLOWABLE_VALUE_NUMBER)
+            .build();
+
+    public static final PropertyDescriptor HASH_KEY_NAME = new PropertyDescriptor.Builder()
+            .name("Hash Key Name")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The hash key name of the item")
+            .build();
+
+    public static final PropertyDescriptor RANGE_KEY_NAME = new PropertyDescriptor.Builder()
+            .name("Range Key Name")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The range key name of the item")
+            .build();
+
+    public static final PropertyDescriptor JSON_DOCUMENT = new PropertyDescriptor.Builder()
+            .name("Json Document attribute")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The Json document to be retrieved from the dynamodb item")
+            .build();
+
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("Batch items for each request (between 1 and 50)")
+            .required(false)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.createLongValidator(1, 50, true))
+            .defaultValue("1")
+            .description("The items to be retrieved in one batch")
+            .build();
+
+    public static final PropertyDescriptor DOCUMENT_CHARSET = new PropertyDescriptor.Builder()
+            .name("Character set of document")
+            .description("Character set of data in the document")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .required(true)
+            .defaultValue(Charset.defaultCharset().name())
+            .build();
+
+    protected volatile DynamoDB dynamoDB;
+
+    public static final Set<Relationship> dynamoDBrelationships = Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_UNPROCESSED)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return dynamoDBrelationships;
+    }
+
+    /**
+     * Create client using credentials provider. This is the preferred way for creating clients
+     */
+    @Override
+    protected AmazonDynamoDBClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
+        getLogger().debug("Creating client with credentials provider");
+
+        final AmazonDynamoDBClient client = new AmazonDynamoDBClient(credentialsProvider, config);
+
+        return client;
+    }
+
+    /**
+     * Create client using AWSCredentials
+     *
+     * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
+     */
+    @Override
+    protected AmazonDynamoDBClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
+        getLogger().debug("Creating client with aws credentials");
+
+        final AmazonDynamoDBClient client = new AmazonDynamoDBClient(credentials, config);
+
+        return client;
+    }
+
+    protected Object getValue(ProcessContext context, PropertyDescriptor type, PropertyDescriptor value, FlowFile flowFile) {
+        if ( context.getProperty(type).getValue().equals(ALLOWABLE_VALUE_STRING.getValue())) {
+            return context.getProperty(value).evaluateAttributeExpressions(flowFile).getValue();
+        } else {
+            return new BigDecimal(context.getProperty(value).evaluateAttributeExpressions(flowFile).getValue());
+        }
+    }
+
+    protected Object getAttributeValue(ProcessContext context, PropertyDescriptor propertyType, AttributeValue value) {
+        if ( context.getProperty(propertyType).getValue().equals(ALLOWABLE_VALUE_STRING.getValue())) {
+            if ( value == null ) return null;
+            else return value.getS();
+        } else {
+            if ( value == null ) return null;
+            else return new BigDecimal(value.getN());
+        }
+    }
+
+    protected synchronized DynamoDB getDynamoDB() {
+        if ( dynamoDB == null )
+            dynamoDB = new DynamoDB(client);
+        return dynamoDB;
+    }
+
+    protected Object getValue(Map<String, AttributeValue> item, String keyName, String valueType) {
+        if ( ALLOWABLE_VALUE_STRING.getValue().equals(valueType)) {
+            AttributeValue val = item.get(keyName);
+            if ( val == null ) return val;
+            else return val.getS();
+        } else {
+            AttributeValue val = item.get(keyName);
+            if ( val == null ) return val;
+            else return val.getN();
+        }
+    }
+
+    protected List<FlowFile> processException(final ProcessSession session, List<FlowFile> flowFiles, Exception exception) {
+        List<FlowFile> failedFlowFiles = new ArrayList<>();
+        for (FlowFile flowFile : flowFiles) {
+            flowFile = session.putAttribute(flowFile, DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() );
+            failedFlowFiles.add(flowFile);
+        }
+        return failedFlowFiles;
+    }
+
+    protected List<FlowFile> processClientException(final ProcessSession session, List<FlowFile> flowFiles,
+            AmazonClientException exception) {
+        List<FlowFile> failedFlowFiles = new ArrayList<>();
+        for (FlowFile flowFile : flowFiles) {
+            Map<String,String> attributes = new HashMap<>();
+            attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() );
+            attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.isRetryable()));
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            failedFlowFiles.add(flowFile);
+        }
+        return failedFlowFiles;
+    }
+
+    protected List<FlowFile> processServiceException(final ProcessSession session, List<FlowFile> flowFiles,
+            AmazonServiceException exception) {
+        List<FlowFile> failedFlowFiles = new ArrayList<>();
+        for (FlowFile flowFile : flowFiles) {
+            Map<String,String> attributes = new HashMap<>();
+            attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() );
+            attributes.put(DYNAMODB_ERROR_CODE, exception.getErrorCode() );
+            attributes.put(DYNAMODB_ERROR_MESSAGE, exception.getErrorMessage() );
+            attributes.put(DYNAMODB_ERROR_TYPE, exception.getErrorType().name() );
+            attributes.put(DYNAMODB_ERROR_SERVICE, exception.getServiceName() );
+            attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.isRetryable()));
+            attributes.put(DYNAMODB_ERROR_REQUEST_ID, exception.getRequestId() );
+            attributes.put(DYNAMODB_ERROR_STATUS_CODE, Integer.toString(exception.getStatusCode()) );
+            attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() );
+            attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.isRetryable()));
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            failedFlowFiles.add(flowFile);
+        }
+        return failedFlowFiles;
+    }
+
+    /**
+     * Send unhandled items to failure and remove the flow files from key to flow file map
+     * @param session used for sending the flow file
+     * @param keysToFlowFileMap - ItemKeys to flow file map
+     * @param hashKeyValue the items hash key value
+     * @param rangeKeyValue the items hash key value
+     */
+    protected void sendUnprocessedToUnprocessedRelationship(final ProcessSession session, Map<ItemKeys, FlowFile> keysToFlowFileMap, Object hashKeyValue, Object rangeKeyValue) {
+        ItemKeys itemKeys = new ItemKeys(hashKeyValue, rangeKeyValue);
+
+        FlowFile flowFile = keysToFlowFileMap.get(itemKeys);
+        flowFile = session.putAttribute(flowFile, DYNAMODB_KEY_ERROR_UNPROCESSED, itemKeys.toString());
+        session.transfer(flowFile,REL_UNPROCESSED);
+
+        getLogger().error("Unprocessed key " + itemKeys + " for flow file " + flowFile);
+
+        keysToFlowFileMap.remove(itemKeys);
+    }
+
+    protected boolean isRangeKeyValueConsistent(String rangeKeyName, Object rangeKeyValue, ProcessSession session,
+            FlowFile flowFile) {
+        boolean isRangeNameBlank = StringUtils.isBlank(rangeKeyName);
+        boolean isRangeValueNull = rangeKeyValue == null;
+        boolean isConsistent = true;
+        if ( ! isRangeNameBlank && (isRangeValueNull || StringUtils.isBlank(rangeKeyValue.toString()))) {
+            isConsistent = false;
+        }
+        if ( isRangeNameBlank &&  ( ! isRangeValueNull && ! StringUtils.isBlank(rangeKeyValue.toString()))) {
+            isConsistent = false;
+        }
+
+        if ( ! isConsistent ) {
+            getLogger().error("Range key name '" + rangeKeyName + "' was not consistent with range value "
+                + rangeKeyValue + "'" + flowFile);
+            flowFile = session.putAttribute(flowFile, DYNAMODB_RANGE_KEY_VALUE_ERROR, "range key '" + rangeKeyName
+                 + "'/value '" + rangeKeyValue + "' inconsistency error");
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        return isConsistent;
+
+    }
+
+    protected boolean isHashKeyValueConsistent(String hashKeyName, Object hashKeyValue, ProcessSession session,
+            FlowFile flowFile) {
+
+        boolean isConsistent = true;
+
+        if ( hashKeyValue == null || StringUtils.isBlank(hashKeyValue.toString())) {
+            getLogger().error("Hash key value '" + hashKeyValue + "' is required for flow file " + flowFile);
+                 flowFile = session.putAttribute(flowFile, DYNAMODB_HASH_KEY_VALUE_ERROR, "hash key " + hashKeyName
+                     + "/value '" + hashKeyValue + "' inconsistency error");
+            session.transfer(flowFile, REL_FAILURE);
+            isConsistent = false;
+        }
+
+        return isConsistent;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractWriteDynamoDBProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractWriteDynamoDBProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractWriteDynamoDBProcessor.java
new file mode 100644
index 0000000..e7ac523
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractWriteDynamoDBProcessor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+
+public abstract class AbstractWriteDynamoDBProcessor extends AbstractDynamoDBProcessor {
+
+    /**
+     * Helper method to handle unprocessed items items
+     * @param session process session
+     * @param keysToFlowFileMap map of flow db primary key to flow file
+     * @param table dynamodb table
+     * @param hashKeyName the hash key name
+     * @param hashKeyValueType the hash key value
+     * @param rangeKeyName the range key name
+     * @param rangeKeyValueType range key value
+     * @param outcome the write outcome
+     */
+    protected void handleUnprocessedItems(final ProcessSession session, Map<ItemKeys, FlowFile> keysToFlowFileMap, final String table, final String hashKeyName, final String hashKeyValueType,
+            final String rangeKeyName, final String rangeKeyValueType, BatchWriteItemOutcome outcome) {
+        BatchWriteItemResult result = outcome.getBatchWriteItemResult();
+
+        // Handle unprocessed items
+        List<WriteRequest> unprocessedItems = result.getUnprocessedItems().get(table);
+        if ( unprocessedItems != null && unprocessedItems.size() > 0 ) {
+            for ( WriteRequest request : unprocessedItems) {
+                Map<String,AttributeValue> item = getRequestItem(request);
+                Object hashKeyValue = getValue(item, hashKeyName, hashKeyValueType);
+                Object rangeKeyValue = getValue(item, rangeKeyName, rangeKeyValueType);
+
+                sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
+            }
+        }
+    }
+
+    /**
+     * Get the request item key and attribute value
+     * @param writeRequest write request
+     * @return Map of keys and values
+     *
+     * @see PutDynamoDB#getRequestItem(WriteRequest)
+     * @see DeleteDynamoDB#getRequestItem(WriteRequest)
+     */
+    protected abstract Map<String, AttributeValue> getRequestItem(WriteRequest writeRequest);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
new file mode 100644
index 0000000..a82de34
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+
+@SupportsBatching
+@SeeAlso({GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Delete", "Remove"})
+@CapabilityDescription("Deletes a document from DynamoDB based on hash and range key. The key can be string or number."
+        + " The request requires all the primary keys for the operation (hash or hash and range key)")
+@WritesAttributes({
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo db unprocessed keys"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "Dynamod db range key error"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo db key not found"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "Dynamo db exception message"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error code"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db error message"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error type"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db error service"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db error is retryable"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db error request id"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db status code")
+    })
+@ReadsAttributes({
+    @ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_HASH_KEY_VALUE, description = "Items hash key value" ),
+    @ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_RANGE_KEY_VALUE, description = "Items range key value" ),
+    })
+public class DeleteDynamoDB extends AbstractWriteDynamoDBProcessor {
+
+    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
+            Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE,
+                HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY,
+                CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger());
+        if (flowFiles == null || flowFiles.size() == 0) {
+            return;
+        }
+
+        Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
+
+        final String table = context.getProperty(TABLE).getValue();
+
+        final String hashKeyName = context.getProperty(HASH_KEY_NAME).getValue();
+        final String hashKeyValueType = context.getProperty(HASH_KEY_VALUE_TYPE).getValue();
+        final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).getValue();
+        final String rangeKeyValueType = context.getProperty(RANGE_KEY_VALUE_TYPE).getValue();
+
+        TableWriteItems tableWriteItems = new TableWriteItems(table);
+
+        for (FlowFile flowFile : flowFiles) {
+            final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile);
+            final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile);
+
+            if ( ! isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile)) {
+                continue;
+            }
+
+            if ( ! isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, session, flowFile) ) {
+                continue;
+            }
+
+            if ( rangeKeyValue == null || StringUtils.isBlank(rangeKeyValue.toString()) ) {
+                tableWriteItems.addHashOnlyPrimaryKeysToDelete(hashKeyName, hashKeyValue);
+            } else {
+                tableWriteItems.addHashAndRangePrimaryKeyToDelete(hashKeyName,
+                        hashKeyValue, rangeKeyName, rangeKeyValue);
+            }
+            keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), flowFile);
+        }
+
+        if ( keysToFlowFileMap.isEmpty() ) {
+            return;
+        }
+
+        final DynamoDB dynamoDB = getDynamoDB();
+
+        try {
+            BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(tableWriteItems);
+
+            handleUnprocessedItems(session, keysToFlowFileMap, table, hashKeyName, hashKeyValueType, rangeKeyName,
+               rangeKeyValueType, outcome);
+
+            // All non unprocessed items are successful
+            for (FlowFile flowFile : keysToFlowFileMap.values()) {
+                getLogger().debug("Successfully deleted item from dynamodb : " + table);
+                session.transfer(flowFile,REL_SUCCESS);
+            }
+        } catch(AmazonServiceException exception) {
+            getLogger().error("Could not process flowFiles due to service exception : " + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processServiceException(session, flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        } catch(AmazonClientException exception) {
+            getLogger().error("Could not process flowFiles due to client exception : " + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processClientException(session, flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        } catch(Exception exception) {
+            getLogger().error("Could not process flowFiles due to exception : " + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processException(session, flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    protected Map<String, AttributeValue> getRequestItem(WriteRequest writeRequest) {
+        return writeRequest.getDeleteRequest().getKey();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
new file mode 100644
index 0000000..808600c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import java.io.ByteArrayInputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes;
+
+@SupportsBatching
+@SeeAlso({DeleteDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Get", "Fetch"})
+@CapabilityDescription("Retrieves a document from DynamoDB based on hash and range key.  The key can be string or number."
+        + "For any get request all the primary keys are required (hash or hash and range based on the table keys)."
+        + "A Json Document ('Map') attribute of the DynamoDB item is read into the content of the FlowFile.")
+@WritesAttributes({
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo db unprocessed keys"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "Dynamod db range key error"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo db key not found"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "Dynamo db exception message"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error code"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db error message"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error type"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db error service"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db error is retryable"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db error request id"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db status code")
+    })
+@ReadsAttributes({
+    @ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_HASH_KEY_VALUE, description = "Items hash key value" ),
+    @ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_RANGE_KEY_VALUE, description = "Items range key value" ),
+    })
+public class GetDynamoDB extends AbstractDynamoDBProcessor {
+
+    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
+            Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE,
+                HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY,
+                CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE));
+
+    public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found")
+            .description("FlowFiles are routed to not found relationship if key not found in the table").build();
+
+    public static final Set<Relationship> getDynamoDBrelationships = Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_UNPROCESSED, REL_NOT_FOUND)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return getDynamoDBrelationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger());
+        if (flowFiles == null || flowFiles.size() == 0) {
+            return;
+        }
+
+        Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
+
+        final String table = context.getProperty(TABLE).getValue();
+        TableKeysAndAttributes tableKeysAndAttributes = new TableKeysAndAttributes(table);
+
+        final String hashKeyName = context.getProperty(HASH_KEY_NAME).getValue();
+        final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).getValue();
+        final String jsonDocument = context.getProperty(JSON_DOCUMENT).getValue();
+
+        for (FlowFile flowFile : flowFiles) {
+            final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile);
+            final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile);
+
+            if ( ! isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile)) {
+                continue;
+            }
+
+            if ( ! isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, session, flowFile) ) {
+                continue;
+            }
+
+            keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), flowFile);
+
+            if ( rangeKeyValue == null || StringUtils.isBlank(rangeKeyValue.toString()) ) {
+                tableKeysAndAttributes.addHashOnlyPrimaryKey(hashKeyName, hashKeyValue);
+            } else {
+                tableKeysAndAttributes.addHashAndRangePrimaryKey(hashKeyName, hashKeyValue, rangeKeyName, rangeKeyValue);
+            }
+        }
+
+        if (keysToFlowFileMap.isEmpty()) {
+            return;
+        }
+
+        final DynamoDB dynamoDB = getDynamoDB();
+
+        try {
+            BatchGetItemOutcome result = dynamoDB.batchGetItem(tableKeysAndAttributes);
+
+            // Handle processed items and get the json document
+            List<Item> items = result.getTableItems().get(table);
+            for (Item item : items) {
+                ItemKeys itemKeys = new ItemKeys(item.get(hashKeyName), item.get(rangeKeyName));
+                FlowFile flowFile = keysToFlowFileMap.get(itemKeys);
+
+                if ( item.get(jsonDocument) != null ) {
+                    ByteArrayInputStream bais = new ByteArrayInputStream(item.getJSON(jsonDocument).getBytes());
+                    flowFile = session.importFrom(bais, flowFile);
+                }
+
+                session.transfer(flowFile,REL_SUCCESS);
+                keysToFlowFileMap.remove(itemKeys);
+            }
+
+            // Handle unprocessed keys
+            Map<String, KeysAndAttributes> unprocessedKeys = result.getUnprocessedKeys();
+            if ( unprocessedKeys != null && unprocessedKeys.size() > 0) {
+                KeysAndAttributes keysAndAttributes = unprocessedKeys.get(table);
+                List<Map<String, AttributeValue>> keys = keysAndAttributes.getKeys();
+
+                for (Map<String,AttributeValue> unprocessedKey : keys) {
+                    Object hashKeyValue = getAttributeValue(context, HASH_KEY_VALUE_TYPE, unprocessedKey.get(hashKeyName));
+                    Object rangeKeyValue = getAttributeValue(context, RANGE_KEY_VALUE_TYPE, unprocessedKey.get(rangeKeyName));
+                    sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
+                }
+            }
+
+            // Handle any remaining items
+            for (ItemKeys key : keysToFlowFileMap.keySet()) {
+                FlowFile flowFile = keysToFlowFileMap.get(key);
+                flowFile = session.putAttribute(flowFile, DYNAMODB_KEY_ERROR_NOT_FOUND, DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE + key.toString() );
+                session.transfer(flowFile,REL_NOT_FOUND);
+                keysToFlowFileMap.remove(key);
+            }
+
+        } catch(AmazonServiceException exception) {
+            getLogger().error("Could not process flowFiles due to service exception : " + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processServiceException(session, flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        } catch(AmazonClientException exception) {
+            getLogger().error("Could not process flowFiles due to client exception : " + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processClientException(session, flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        } catch(Exception exception) {
+            getLogger().error("Could not process flowFiles due to exception : " + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processException(session, flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/ItemKeys.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/ItemKeys.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/ItemKeys.java
new file mode 100644
index 0000000..9e53457
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/ItemKeys.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+/**
+ * Utility class to keep a map of keys and flow files
+ */
+class ItemKeys {
+
+    protected Object hashKey = "";
+    protected Object rangeKey = "";
+
+    public ItemKeys(Object hashKey, Object rangeKey) {
+        if ( hashKey != null )
+            this.hashKey = hashKey;
+        if ( rangeKey != null )
+            this.rangeKey = rangeKey;
+    }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this,ToStringStyle.SHORT_PREFIX_STYLE);
+    }
+
+    @Override
+    public int hashCode() {
+        return HashCodeBuilder.reflectionHashCode(this, false);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        return EqualsBuilder.reflectionEquals(this, other, false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
new file mode 100644
index 0000000..83fea37
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+
+@SupportsBatching
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert"})
+@CapabilityDescription("Puts a document from DynamoDB based on hash and range key.  The table can have either hash and range or hash key alone."
+    + " Currently the keys supported are string and number and value can be json document. "
+    + "In case of hash and range keys both key are required for the operation."
+    + " The FlowFile content must be JSON. FlowFile content is mapped to the specified Json Document attribute in the DynamoDB item.")
+@WritesAttributes({
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo db unprocessed keys"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "Dynamod db range key error"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo db key not found"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "Dynamo db exception message"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error code"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db error message"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error type"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db error service"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db error is retryable"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db error request id"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db error status code"),
+    @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception message on creating item")
+})
+@ReadsAttributes({
+    @ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_HASH_KEY_VALUE, description = "Items hash key value"),
+    @ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_RANGE_KEY_VALUE, description = "Items range key value")
+})
+public class PutDynamoDB extends AbstractWriteDynamoDBProcessor {
+
+    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
+        Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE,
+            HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, DOCUMENT_CHARSET, BATCH_SIZE,
+            REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE));
+
+    /**
+     * Dyamodb max item size limit 400 kb
+     */
+    public static final int DYNAMODB_MAX_ITEM_SIZE = 400 * 1024;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger());
+        if (flowFiles == null || flowFiles.size() == 0) {
+            return;
+        }
+
+        Map<ItemKeys, FlowFile> keysToFlowFileMap = new HashMap<>();
+
+        final String table = context.getProperty(TABLE).getValue();
+
+        final String hashKeyName = context.getProperty(HASH_KEY_NAME).getValue();
+        final String hashKeyValueType = context.getProperty(HASH_KEY_VALUE_TYPE).getValue();
+        final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).getValue();
+        final String rangeKeyValueType = context.getProperty(RANGE_KEY_VALUE_TYPE).getValue();
+        final String jsonDocument = context.getProperty(JSON_DOCUMENT).getValue();
+        final String charset = context.getProperty(DOCUMENT_CHARSET).getValue();
+
+        TableWriteItems tableWriteItems = new TableWriteItems(table);
+
+        for (FlowFile flowFile : flowFiles) {
+            final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile);
+            final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile);
+
+            if (!isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile)) {
+                continue;
+            }
+
+            if (!isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, session, flowFile)) {
+                continue;
+            }
+
+            if (!isDataValid(flowFile, jsonDocument)) {
+                flowFile = session.putAttribute(flowFile, AWS_DYNAMO_DB_ITEM_SIZE_ERROR, "Max size of item + attribute should be 400kb but was " + flowFile.getSize() + jsonDocument.length());
+                session.transfer(flowFile, REL_FAILURE);
+                continue;
+            }
+
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            session.exportTo(flowFile, baos);
+
+            try {
+                if (rangeKeyValue == null || StringUtils.isBlank(rangeKeyValue.toString())) {
+                    tableWriteItems.addItemToPut(new Item().withKeyComponent(hashKeyName, hashKeyValue)
+                        .withJSON(jsonDocument, IOUtils.toString(baos.toByteArray(), charset)));
+                } else {
+                    tableWriteItems.addItemToPut(new Item().withKeyComponent(hashKeyName, hashKeyValue)
+                        .withKeyComponent(rangeKeyName, rangeKeyValue)
+                        .withJSON(jsonDocument, IOUtils.toString(baos.toByteArray(), charset)));
+                }
+            } catch (IOException ioe) {
+                getLogger().error("IOException while creating put item : " + ioe.getMessage());
+                flowFile = session.putAttribute(flowFile, DYNAMODB_ITEM_IO_ERROR, ioe.getMessage());
+                session.transfer(flowFile, REL_FAILURE);
+            }
+            keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), flowFile);
+        }
+
+        if (keysToFlowFileMap.isEmpty()) {
+            return;
+        }
+
+        final DynamoDB dynamoDB = getDynamoDB();
+
+        try {
+            BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(tableWriteItems);
+
+            handleUnprocessedItems(session, keysToFlowFileMap, table, hashKeyName, hashKeyValueType, rangeKeyName,
+                rangeKeyValueType, outcome);
+
+            // Handle any remaining flowfiles
+            for (FlowFile flowFile : keysToFlowFileMap.values()) {
+                getLogger().debug("Successful posted items to dynamodb : " + table);
+                session.transfer(flowFile, REL_SUCCESS);
+            }
+        } catch (AmazonServiceException exception) {
+            getLogger().error("Could not process flowFiles due to service exception : " + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processServiceException(session, flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        } catch (AmazonClientException exception) {
+            getLogger().error("Could not process flowFiles due to client exception : " + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processClientException(session, flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        } catch (Exception exception) {
+            getLogger().error("Could not process flowFiles due to exception : " + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processException(session, flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        }
+    }
+
+    private boolean isDataValid(FlowFile flowFile, String jsonDocument) {
+        return (flowFile.getSize() + jsonDocument.length()) < DYNAMODB_MAX_ITEM_SIZE;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    protected Map<String, AttributeValue> getRequestItem(WriteRequest writeRequest) {
+        return writeRequest.getPutRequest().getItem();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 2d5460f..9eb1c7b 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -19,5 +19,8 @@ org.apache.nifi.processors.aws.sns.PutSNS
 org.apache.nifi.processors.aws.sqs.GetSQS
 org.apache.nifi.processors.aws.sqs.PutSQS
 org.apache.nifi.processors.aws.sqs.DeleteSQS
-org.apache.nifi.processors.aws.lambda.PutLambda
+org.apache.nifi.processors.aws.lambda.PutLambda
 org.apache.nifi.processors.aws.kinesis.firehose.PutKinesisFirehose
+org.apache.nifi.processors.aws.dynamodb.GetDynamoDB
+org.apache.nifi.processors.aws.dynamodb.PutDynamoDB
+org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDBTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDBTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDBTest.java
new file mode 100644
index 0000000..fa0e605
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDBTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.REGION;
+import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.stringHashStringRangeTableName;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
+import com.amazonaws.services.dynamodbv2.model.DeleteRequest;
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+public class DeleteDynamoDBTest {
+
+    protected DeleteDynamoDB deleteDynamoDB;
+    protected BatchWriteItemResult result = new BatchWriteItemResult();
+    BatchWriteItemOutcome outcome;
+
+    @Before
+    public void setUp() {
+        outcome = new BatchWriteItemOutcome(result);
+        result.setUnprocessedItems(new HashMap<String, List<WriteRequest>>());
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+            @Override
+            public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
+                return outcome;
+            }
+        };
+
+        deleteDynamoDB = new DeleteDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteOnlyHashFailure() {
+        final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
+
+        List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            ITAbstractDynamoDBTest.validateServiceExceptionAttribute(flowFile);
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteSuccessfulWithMock() {
+        final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1);
+
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteSuccessfulWithMockOneUnprocessed() {
+        Map<String, List<WriteRequest>> unprocessed =
+                new HashMap<String, List<WriteRequest>>();
+        DeleteRequest delete = new DeleteRequest();
+        delete.addKeyEntry("hashS", new AttributeValue("h1"));
+        delete.addKeyEntry("rangeS", new AttributeValue("r1"));
+        WriteRequest write = new WriteRequest(delete);
+        List<WriteRequest> writes = new ArrayList<>();
+        writes.add(write);
+        unprocessed.put(stringHashStringRangeTableName, writes);
+        result.setUnprocessedItems(unprocessed);
+        final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1);
+
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteNoHashValueFailure() {
+        final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
+
+        List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_HASH_KEY_VALUE_ERROR));
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteOnlyHashWithRangeValueNoRangeNameFailure() {
+        final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
+
+        List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteOnlyHashWithRangeNameNoRangeValueFailure() {
+        final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
+
+        List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
+        }
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteNonExistentHashSuccess() {
+        final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "nonexistent");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1);
+
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteNonExistentRangeSuccess() {
+        final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "nonexistent");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1);
+
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteThrowsServiceException() {
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+            @Override
+            public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
+                throw new AmazonServiceException("serviceException");
+            }
+        };
+
+        deleteDynamoDB = new DeleteDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+        final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
+        List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertEquals("serviceException (Service: null; Status Code: 0; Error Code: null; Request ID: null)", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteThrowsClientException() {
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+            @Override
+            public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
+                throw new AmazonClientException("clientException");
+            }
+        };
+
+        deleteDynamoDB = new DeleteDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+        final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
+        List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertEquals("clientException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteThrowsRuntimeException() {
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+            @Override
+            public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
+                throw new RuntimeException("runtimeException");
+            }
+        };
+
+        deleteDynamoDB = new DeleteDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+        final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
+        List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertEquals("runtimeException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
+        }
+
+    }
+}


Mime
View raw message