Return-Path: X-Original-To: apmail-nifi-commits-archive@minotaur.apache.org Delivered-To: apmail-nifi-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7DACA19A70 for ; Fri, 8 Apr 2016 15:23:23 +0000 (UTC) Received: (qmail 92999 invoked by uid 500); 8 Apr 2016 15:23:23 -0000 Delivered-To: apmail-nifi-commits-archive@nifi.apache.org Received: (qmail 92917 invoked by uid 500); 8 Apr 2016 15:23:23 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 92900 invoked by uid 99); 8 Apr 2016 15:23:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Apr 2016 15:23:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E152FE0901; Fri, 8 Apr 2016 15:23:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aldrin@apache.org To: commits@nifi.apache.org Date: Fri, 08 Apr 2016 15:23:23 -0000 Message-Id: <827dfc286fda428cb92434bce8a456cb@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] nifi git commit: NIFI-1516 Provide AWS DynamoDB Delete/Put/Get processors NIFI-1516 Provide AWS DynamoDB Delete/Put/Get processors This closes #224. Signed-off-by: Aldrin Piri Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e3155a8a Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e3155a8a Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e3155a8a Branch: refs/heads/master Commit: e3155a8a4ba49bb6c4f324ebf090a92d6dd97389 Parents: 41583e6 Author: mans2singh Authored: Fri Apr 8 11:21:39 2016 -0400 Committer: Aldrin Piri Committed: Fri Apr 8 11:23:04 2016 -0400 ---------------------------------------------------------------------- .../processors/aws/AbstractAWSProcessor.java | 7 + .../aws/dynamodb/AbstractDynamoDBProcessor.java | 338 ++++++++++++ .../AbstractWriteDynamoDBProcessor.java | 69 +++ .../processors/aws/dynamodb/DeleteDynamoDB.java | 161 ++++++ .../processors/aws/dynamodb/GetDynamoDB.java | 197 +++++++ .../nifi/processors/aws/dynamodb/ItemKeys.java | 53 ++ .../processors/aws/dynamodb/PutDynamoDB.java | 197 +++++++ .../org.apache.nifi.processor.Processor | 5 +- .../aws/dynamodb/DeleteDynamoDBTest.java | 368 +++++++++++++ .../aws/dynamodb/GetDynamoDBTest.java | 530 +++++++++++++++++++ .../aws/dynamodb/ITAbstractDynamoDBTest.java | 139 +++++ .../dynamodb/ITPutGetDeleteGetDynamoDBTest.java | 428 +++++++++++++++ .../processors/aws/dynamodb/ItemKeysTest.java | 68 +++ .../aws/dynamodb/PutDynamoDBTest.java | 460 ++++++++++++++++ 14 files changed, 3019 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java index a2eb9df..e2c2196 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java @@ -31,6 +31,7 @@ import javax.net.ssl.SSLContext; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -287,4 +288,10 @@ public abstract class AbstractAWSProcessor { + + public static final Relationship REL_UNPROCESSED = new Relationship.Builder().name("unprocessed") + .description("FlowFiles are routed to unprocessed relationship when DynamoDB is not able to process " + + "all the items in the request. Typical reasons are insufficient table throughput capacity and exceeding the maximum bytes per request. " + + "Unprocessed FlowFiles can be retried with a new request.").build(); + + public static final AllowableValue ALLOWABLE_VALUE_STRING = new AllowableValue("string"); + public static final AllowableValue ALLOWABLE_VALUE_NUMBER = new AllowableValue("number"); + + public static final String DYNAMODB_KEY_ERROR_UNPROCESSED = "dynamodb.key.error.unprocessed"; + public static final String DYNAMODB_RANGE_KEY_VALUE_ERROR = "dynmodb.range.key.value.error"; + public static final String DYNAMODB_HASH_KEY_VALUE_ERROR = "dynmodb.hash.key.value.error"; + public static final String DYNAMODB_KEY_ERROR_NOT_FOUND = "dynamodb.key.error.not.found"; + public static final String DYNAMODB_ERROR_EXCEPTION_MESSAGE = "dynamodb.error.exception.message"; + public static final String DYNAMODB_ERROR_CODE = "dynamodb.error.code"; + public static final String DYNAMODB_ERROR_MESSAGE = "dynamodb.error.message"; + public static final String DYNAMODB_ERROR_TYPE = "dynamodb.error.type"; + public static final String DYNAMODB_ERROR_SERVICE = "dynamodb.error.service"; + public static final String DYNAMODB_ERROR_RETRYABLE = "dynamodb.error.retryable"; + public static final String DYNAMODB_ERROR_REQUEST_ID = "dynamodb.error.request.id"; + public static final String DYNAMODB_ERROR_STATUS_CODE = "dynamodb.error.status.code"; + public static final String DYNAMODB_ITEM_HASH_KEY_VALUE = " dynamodb.item.hash.key.value"; + public static final String DYNAMODB_ITEM_RANGE_KEY_VALUE = " dynamodb.item.range.key.value"; + public static final String DYNAMODB_ITEM_IO_ERROR = "dynamodb.item.io.error"; + public static final String AWS_DYNAMO_DB_ITEM_SIZE_ERROR = "dynamodb.item.size.error"; + + protected static final String DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE = "DynamoDB key not found : "; + + public static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder() + .name("Table Name") + .required(true) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("The DynamoDB table name") + .build(); + + public static final PropertyDescriptor HASH_KEY_VALUE = new PropertyDescriptor.Builder() + .name("Hash Key Value") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("The hash key value of the item") + .defaultValue("${dynamodb.item.hash.key.value}") + .build(); + + public static final PropertyDescriptor RANGE_KEY_VALUE = new PropertyDescriptor.Builder() + .name("Range Key Value") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("${dynamodb.item.range.key.value}") + .build(); + + public static final PropertyDescriptor HASH_KEY_VALUE_TYPE = new PropertyDescriptor.Builder() + .name("Hash Key Value Type") + .required(true) + .description("The hash key value type of the item") + .defaultValue(ALLOWABLE_VALUE_STRING.getValue()) + .allowableValues(ALLOWABLE_VALUE_STRING, ALLOWABLE_VALUE_NUMBER) + .build(); + + public static final PropertyDescriptor RANGE_KEY_VALUE_TYPE = new PropertyDescriptor.Builder() + .name("Range Key Value Type") + .required(true) + .description("The range key value type of the item") + .defaultValue(ALLOWABLE_VALUE_STRING.getValue()) + .allowableValues(ALLOWABLE_VALUE_STRING, ALLOWABLE_VALUE_NUMBER) + .build(); + + public static final PropertyDescriptor HASH_KEY_NAME = new PropertyDescriptor.Builder() + .name("Hash Key Name") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("The hash key name of the item") + .build(); + + public static final PropertyDescriptor RANGE_KEY_NAME = new PropertyDescriptor.Builder() + .name("Range Key Name") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("The range key name of the item") + .build(); + + public static final PropertyDescriptor JSON_DOCUMENT = new PropertyDescriptor.Builder() + .name("Json Document attribute") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("The Json document to be retrieved from the dynamodb item") + .build(); + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch items for each request (between 1 and 50)") + .required(false) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.createLongValidator(1, 50, true)) + .defaultValue("1") + .description("The items to be retrieved in one batch") + .build(); + + public static final PropertyDescriptor DOCUMENT_CHARSET = new PropertyDescriptor.Builder() + .name("Character set of document") + .description("Character set of data in the document") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .required(true) + .defaultValue(Charset.defaultCharset().name()) + .build(); + + protected volatile DynamoDB dynamoDB; + + public static final Set dynamoDBrelationships = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_UNPROCESSED))); + + @Override + public Set getRelationships() { + return dynamoDBrelationships; + } + + /** + * Create client using credentials provider. This is the preferred way for creating clients + */ + @Override + protected AmazonDynamoDBClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) { + getLogger().debug("Creating client with credentials provider"); + + final AmazonDynamoDBClient client = new AmazonDynamoDBClient(credentialsProvider, config); + + return client; + } + + /** + * Create client using AWSCredentials + * + * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead + */ + @Override + protected AmazonDynamoDBClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { + getLogger().debug("Creating client with aws credentials"); + + final AmazonDynamoDBClient client = new AmazonDynamoDBClient(credentials, config); + + return client; + } + + protected Object getValue(ProcessContext context, PropertyDescriptor type, PropertyDescriptor value, FlowFile flowFile) { + if ( context.getProperty(type).getValue().equals(ALLOWABLE_VALUE_STRING.getValue())) { + return context.getProperty(value).evaluateAttributeExpressions(flowFile).getValue(); + } else { + return new BigDecimal(context.getProperty(value).evaluateAttributeExpressions(flowFile).getValue()); + } + } + + protected Object getAttributeValue(ProcessContext context, PropertyDescriptor propertyType, AttributeValue value) { + if ( context.getProperty(propertyType).getValue().equals(ALLOWABLE_VALUE_STRING.getValue())) { + if ( value == null ) return null; + else return value.getS(); + } else { + if ( value == null ) return null; + else return new BigDecimal(value.getN()); + } + } + + protected synchronized DynamoDB getDynamoDB() { + if ( dynamoDB == null ) + dynamoDB = new DynamoDB(client); + return dynamoDB; + } + + protected Object getValue(Map item, String keyName, String valueType) { + if ( ALLOWABLE_VALUE_STRING.getValue().equals(valueType)) { + AttributeValue val = item.get(keyName); + if ( val == null ) return val; + else return val.getS(); + } else { + AttributeValue val = item.get(keyName); + if ( val == null ) return val; + else return val.getN(); + } + } + + protected List processException(final ProcessSession session, List flowFiles, Exception exception) { + List failedFlowFiles = new ArrayList<>(); + for (FlowFile flowFile : flowFiles) { + flowFile = session.putAttribute(flowFile, DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() ); + failedFlowFiles.add(flowFile); + } + return failedFlowFiles; + } + + protected List processClientException(final ProcessSession session, List flowFiles, + AmazonClientException exception) { + List failedFlowFiles = new ArrayList<>(); + for (FlowFile flowFile : flowFiles) { + Map attributes = new HashMap<>(); + attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() ); + attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.isRetryable())); + flowFile = session.putAllAttributes(flowFile, attributes); + failedFlowFiles.add(flowFile); + } + return failedFlowFiles; + } + + protected List processServiceException(final ProcessSession session, List flowFiles, + AmazonServiceException exception) { + List failedFlowFiles = new ArrayList<>(); + for (FlowFile flowFile : flowFiles) { + Map attributes = new HashMap<>(); + attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() ); + attributes.put(DYNAMODB_ERROR_CODE, exception.getErrorCode() ); + attributes.put(DYNAMODB_ERROR_MESSAGE, exception.getErrorMessage() ); + attributes.put(DYNAMODB_ERROR_TYPE, exception.getErrorType().name() ); + attributes.put(DYNAMODB_ERROR_SERVICE, exception.getServiceName() ); + attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.isRetryable())); + attributes.put(DYNAMODB_ERROR_REQUEST_ID, exception.getRequestId() ); + attributes.put(DYNAMODB_ERROR_STATUS_CODE, Integer.toString(exception.getStatusCode()) ); + attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() ); + attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.isRetryable())); + flowFile = session.putAllAttributes(flowFile, attributes); + failedFlowFiles.add(flowFile); + } + return failedFlowFiles; + } + + /** + * Send unhandled items to failure and remove the flow files from key to flow file map + * @param session used for sending the flow file + * @param keysToFlowFileMap - ItemKeys to flow file map + * @param hashKeyValue the items hash key value + * @param rangeKeyValue the items hash key value + */ + protected void sendUnprocessedToUnprocessedRelationship(final ProcessSession session, Map keysToFlowFileMap, Object hashKeyValue, Object rangeKeyValue) { + ItemKeys itemKeys = new ItemKeys(hashKeyValue, rangeKeyValue); + + FlowFile flowFile = keysToFlowFileMap.get(itemKeys); + flowFile = session.putAttribute(flowFile, DYNAMODB_KEY_ERROR_UNPROCESSED, itemKeys.toString()); + session.transfer(flowFile,REL_UNPROCESSED); + + getLogger().error("Unprocessed key " + itemKeys + " for flow file " + flowFile); + + keysToFlowFileMap.remove(itemKeys); + } + + protected boolean isRangeKeyValueConsistent(String rangeKeyName, Object rangeKeyValue, ProcessSession session, + FlowFile flowFile) { + boolean isRangeNameBlank = StringUtils.isBlank(rangeKeyName); + boolean isRangeValueNull = rangeKeyValue == null; + boolean isConsistent = true; + if ( ! isRangeNameBlank && (isRangeValueNull || StringUtils.isBlank(rangeKeyValue.toString()))) { + isConsistent = false; + } + if ( isRangeNameBlank && ( ! isRangeValueNull && ! StringUtils.isBlank(rangeKeyValue.toString()))) { + isConsistent = false; + } + + if ( ! isConsistent ) { + getLogger().error("Range key name '" + rangeKeyName + "' was not consistent with range value " + + rangeKeyValue + "'" + flowFile); + flowFile = session.putAttribute(flowFile, DYNAMODB_RANGE_KEY_VALUE_ERROR, "range key '" + rangeKeyName + + "'/value '" + rangeKeyValue + "' inconsistency error"); + session.transfer(flowFile, REL_FAILURE); + } + + return isConsistent; + + } + + protected boolean isHashKeyValueConsistent(String hashKeyName, Object hashKeyValue, ProcessSession session, + FlowFile flowFile) { + + boolean isConsistent = true; + + if ( hashKeyValue == null || StringUtils.isBlank(hashKeyValue.toString())) { + getLogger().error("Hash key value '" + hashKeyValue + "' is required for flow file " + flowFile); + flowFile = session.putAttribute(flowFile, DYNAMODB_HASH_KEY_VALUE_ERROR, "hash key " + hashKeyName + + "/value '" + hashKeyValue + "' inconsistency error"); + session.transfer(flowFile, REL_FAILURE); + isConsistent = false; + } + + return isConsistent; + + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractWriteDynamoDBProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractWriteDynamoDBProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractWriteDynamoDBProcessor.java new file mode 100644 index 0000000..e7ac523 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractWriteDynamoDBProcessor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.dynamodb; + +import java.util.List; +import java.util.Map; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; + +import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; + +public abstract class AbstractWriteDynamoDBProcessor extends AbstractDynamoDBProcessor { + + /** + * Helper method to handle unprocessed items items + * @param session process session + * @param keysToFlowFileMap map of flow db primary key to flow file + * @param table dynamodb table + * @param hashKeyName the hash key name + * @param hashKeyValueType the hash key value + * @param rangeKeyName the range key name + * @param rangeKeyValueType range key value + * @param outcome the write outcome + */ + protected void handleUnprocessedItems(final ProcessSession session, Map keysToFlowFileMap, final String table, final String hashKeyName, final String hashKeyValueType, + final String rangeKeyName, final String rangeKeyValueType, BatchWriteItemOutcome outcome) { + BatchWriteItemResult result = outcome.getBatchWriteItemResult(); + + // Handle unprocessed items + List unprocessedItems = result.getUnprocessedItems().get(table); + if ( unprocessedItems != null && unprocessedItems.size() > 0 ) { + for ( WriteRequest request : unprocessedItems) { + Map item = getRequestItem(request); + Object hashKeyValue = getValue(item, hashKeyName, hashKeyValueType); + Object rangeKeyValue = getValue(item, rangeKeyName, rangeKeyValueType); + + sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue); + } + } + } + + /** + * Get the request item key and attribute value + * @param writeRequest write request + * @return Map of keys and values + * + * @see PutDynamoDB#getRequestItem(WriteRequest) + * @see DeleteDynamoDB#getRequestItem(WriteRequest) + */ + protected abstract Map getRequestItem(WriteRequest writeRequest); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java new file mode 100644 index 0000000..a82de34 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.dynamodb; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.TableWriteItems; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; + +@SupportsBatching +@SeeAlso({GetDynamoDB.class, PutDynamoDB.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"Amazon", "DynamoDB", "AWS", "Delete", "Remove"}) +@CapabilityDescription("Deletes a document from DynamoDB based on hash and range key. The key can be string or number." + + " The request requires all the primary keys for the operation (hash or hash and range key)") +@WritesAttributes({ + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo db unprocessed keys"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "Dynamod db range key error"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo db key not found"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "Dynamo db exception message"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error code"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db error message"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error type"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db error service"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db error is retryable"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db error request id"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db status code") + }) +@ReadsAttributes({ + @ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_HASH_KEY_VALUE, description = "Items hash key value" ), + @ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_RANGE_KEY_VALUE, description = "Items range key value" ), + }) +public class DeleteDynamoDB extends AbstractWriteDynamoDBProcessor { + + public static final List properties = Collections.unmodifiableList( + Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE, + HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, + CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE)); + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + List flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger()); + if (flowFiles == null || flowFiles.size() == 0) { + return; + } + + Map keysToFlowFileMap = new HashMap<>(); + + final String table = context.getProperty(TABLE).getValue(); + + final String hashKeyName = context.getProperty(HASH_KEY_NAME).getValue(); + final String hashKeyValueType = context.getProperty(HASH_KEY_VALUE_TYPE).getValue(); + final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).getValue(); + final String rangeKeyValueType = context.getProperty(RANGE_KEY_VALUE_TYPE).getValue(); + + TableWriteItems tableWriteItems = new TableWriteItems(table); + + for (FlowFile flowFile : flowFiles) { + final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile); + final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile); + + if ( ! isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile)) { + continue; + } + + if ( ! isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, session, flowFile) ) { + continue; + } + + if ( rangeKeyValue == null || StringUtils.isBlank(rangeKeyValue.toString()) ) { + tableWriteItems.addHashOnlyPrimaryKeysToDelete(hashKeyName, hashKeyValue); + } else { + tableWriteItems.addHashAndRangePrimaryKeyToDelete(hashKeyName, + hashKeyValue, rangeKeyName, rangeKeyValue); + } + keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), flowFile); + } + + if ( keysToFlowFileMap.isEmpty() ) { + return; + } + + final DynamoDB dynamoDB = getDynamoDB(); + + try { + BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(tableWriteItems); + + handleUnprocessedItems(session, keysToFlowFileMap, table, hashKeyName, hashKeyValueType, rangeKeyName, + rangeKeyValueType, outcome); + + // All non unprocessed items are successful + for (FlowFile flowFile : keysToFlowFileMap.values()) { + getLogger().debug("Successfully deleted item from dynamodb : " + table); + session.transfer(flowFile,REL_SUCCESS); + } + } catch(AmazonServiceException exception) { + getLogger().error("Could not process flowFiles due to service exception : " + exception.getMessage()); + List failedFlowFiles = processServiceException(session, flowFiles, exception); + session.transfer(failedFlowFiles, REL_FAILURE); + } catch(AmazonClientException exception) { + getLogger().error("Could not process flowFiles due to client exception : " + exception.getMessage()); + List failedFlowFiles = processClientException(session, flowFiles, exception); + session.transfer(failedFlowFiles, REL_FAILURE); + } catch(Exception exception) { + getLogger().error("Could not process flowFiles due to exception : " + exception.getMessage()); + List failedFlowFiles = processException(session, flowFiles, exception); + session.transfer(failedFlowFiles, REL_FAILURE); + } + } + + /** + * {@inheritDoc} + */ + protected Map getRequestItem(WriteRequest writeRequest) { + return writeRequest.getDeleteRequest().getKey(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java new file mode 100644 index 0000000..808600c --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.dynamodb; + +import java.io.ByteArrayInputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.Item; +import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes; + +@SupportsBatching +@SeeAlso({DeleteDynamoDB.class, PutDynamoDB.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"Amazon", "DynamoDB", "AWS", "Get", "Fetch"}) +@CapabilityDescription("Retrieves a document from DynamoDB based on hash and range key. The key can be string or number." + + "For any get request all the primary keys are required (hash or hash and range based on the table keys)." + + "A Json Document ('Map') attribute of the DynamoDB item is read into the content of the FlowFile.") +@WritesAttributes({ + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo db unprocessed keys"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "Dynamod db range key error"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo db key not found"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "Dynamo db exception message"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error code"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db error message"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error type"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db error service"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db error is retryable"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db error request id"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db status code") + }) +@ReadsAttributes({ + @ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_HASH_KEY_VALUE, description = "Items hash key value" ), + @ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_RANGE_KEY_VALUE, description = "Items range key value" ), + }) +public class GetDynamoDB extends AbstractDynamoDBProcessor { + + public static final List properties = Collections.unmodifiableList( + Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE, + HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, + CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE)); + + public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found") + .description("FlowFiles are routed to not found relationship if key not found in the table").build(); + + public static final Set getDynamoDBrelationships = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_UNPROCESSED, REL_NOT_FOUND))); + + @Override + public Set getRelationships() { + return getDynamoDBrelationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + List flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger()); + if (flowFiles == null || flowFiles.size() == 0) { + return; + } + + Map keysToFlowFileMap = new HashMap<>(); + + final String table = context.getProperty(TABLE).getValue(); + TableKeysAndAttributes tableKeysAndAttributes = new TableKeysAndAttributes(table); + + final String hashKeyName = context.getProperty(HASH_KEY_NAME).getValue(); + final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).getValue(); + final String jsonDocument = context.getProperty(JSON_DOCUMENT).getValue(); + + for (FlowFile flowFile : flowFiles) { + final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile); + final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile); + + if ( ! isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile)) { + continue; + } + + if ( ! isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, session, flowFile) ) { + continue; + } + + keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), flowFile); + + if ( rangeKeyValue == null || StringUtils.isBlank(rangeKeyValue.toString()) ) { + tableKeysAndAttributes.addHashOnlyPrimaryKey(hashKeyName, hashKeyValue); + } else { + tableKeysAndAttributes.addHashAndRangePrimaryKey(hashKeyName, hashKeyValue, rangeKeyName, rangeKeyValue); + } + } + + if (keysToFlowFileMap.isEmpty()) { + return; + } + + final DynamoDB dynamoDB = getDynamoDB(); + + try { + BatchGetItemOutcome result = dynamoDB.batchGetItem(tableKeysAndAttributes); + + // Handle processed items and get the json document + List items = result.getTableItems().get(table); + for (Item item : items) { + ItemKeys itemKeys = new ItemKeys(item.get(hashKeyName), item.get(rangeKeyName)); + FlowFile flowFile = keysToFlowFileMap.get(itemKeys); + + if ( item.get(jsonDocument) != null ) { + ByteArrayInputStream bais = new ByteArrayInputStream(item.getJSON(jsonDocument).getBytes()); + flowFile = session.importFrom(bais, flowFile); + } + + session.transfer(flowFile,REL_SUCCESS); + keysToFlowFileMap.remove(itemKeys); + } + + // Handle unprocessed keys + Map unprocessedKeys = result.getUnprocessedKeys(); + if ( unprocessedKeys != null && unprocessedKeys.size() > 0) { + KeysAndAttributes keysAndAttributes = unprocessedKeys.get(table); + List> keys = keysAndAttributes.getKeys(); + + for (Map unprocessedKey : keys) { + Object hashKeyValue = getAttributeValue(context, HASH_KEY_VALUE_TYPE, unprocessedKey.get(hashKeyName)); + Object rangeKeyValue = getAttributeValue(context, RANGE_KEY_VALUE_TYPE, unprocessedKey.get(rangeKeyName)); + sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue); + } + } + + // Handle any remaining items + for (ItemKeys key : keysToFlowFileMap.keySet()) { + FlowFile flowFile = keysToFlowFileMap.get(key); + flowFile = session.putAttribute(flowFile, DYNAMODB_KEY_ERROR_NOT_FOUND, DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE + key.toString() ); + session.transfer(flowFile,REL_NOT_FOUND); + keysToFlowFileMap.remove(key); + } + + } catch(AmazonServiceException exception) { + getLogger().error("Could not process flowFiles due to service exception : " + exception.getMessage()); + List failedFlowFiles = processServiceException(session, flowFiles, exception); + session.transfer(failedFlowFiles, REL_FAILURE); + } catch(AmazonClientException exception) { + getLogger().error("Could not process flowFiles due to client exception : " + exception.getMessage()); + List failedFlowFiles = processClientException(session, flowFiles, exception); + session.transfer(failedFlowFiles, REL_FAILURE); + } catch(Exception exception) { + getLogger().error("Could not process flowFiles due to exception : " + exception.getMessage()); + List failedFlowFiles = processException(session, flowFiles, exception); + session.transfer(failedFlowFiles, REL_FAILURE); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/ItemKeys.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/ItemKeys.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/ItemKeys.java new file mode 100644 index 0000000..9e53457 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/ItemKeys.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.dynamodb; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +/** + * Utility class to keep a map of keys and flow files + */ +class ItemKeys { + + protected Object hashKey = ""; + protected Object rangeKey = ""; + + public ItemKeys(Object hashKey, Object rangeKey) { + if ( hashKey != null ) + this.hashKey = hashKey; + if ( rangeKey != null ) + this.rangeKey = rangeKey; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this,ToStringStyle.SHORT_PREFIX_STYLE); + } + + @Override + public int hashCode() { + return HashCodeBuilder.reflectionHashCode(this, false); + } + + @Override + public boolean equals(Object other) { + return EqualsBuilder.reflectionEquals(this, other, false); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java new file mode 100644 index 0000000..83fea37 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.dynamodb; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.Item; +import com.amazonaws.services.dynamodbv2.document.TableWriteItems; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; + +@SupportsBatching +@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert"}) +@CapabilityDescription("Puts a document from DynamoDB based on hash and range key. The table can have either hash and range or hash key alone." + + " Currently the keys supported are string and number and value can be json document. " + + "In case of hash and range keys both key are required for the operation." + + " The FlowFile content must be JSON. FlowFile content is mapped to the specified Json Document attribute in the DynamoDB item.") +@WritesAttributes({ + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo db unprocessed keys"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "Dynamod db range key error"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo db key not found"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "Dynamo db exception message"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error code"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db error message"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error type"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db error service"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db error is retryable"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db error request id"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db error status code"), + @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception message on creating item") +}) +@ReadsAttributes({ + @ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_HASH_KEY_VALUE, description = "Items hash key value"), + @ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_RANGE_KEY_VALUE, description = "Items range key value") +}) +public class PutDynamoDB extends AbstractWriteDynamoDBProcessor { + + public static final List properties = Collections.unmodifiableList( + Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE, + HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, DOCUMENT_CHARSET, BATCH_SIZE, + REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE)); + + /** + * Dyamodb max item size limit 400 kb + */ + public static final int DYNAMODB_MAX_ITEM_SIZE = 400 * 1024; + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + List flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger()); + if (flowFiles == null || flowFiles.size() == 0) { + return; + } + + Map keysToFlowFileMap = new HashMap<>(); + + final String table = context.getProperty(TABLE).getValue(); + + final String hashKeyName = context.getProperty(HASH_KEY_NAME).getValue(); + final String hashKeyValueType = context.getProperty(HASH_KEY_VALUE_TYPE).getValue(); + final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).getValue(); + final String rangeKeyValueType = context.getProperty(RANGE_KEY_VALUE_TYPE).getValue(); + final String jsonDocument = context.getProperty(JSON_DOCUMENT).getValue(); + final String charset = context.getProperty(DOCUMENT_CHARSET).getValue(); + + TableWriteItems tableWriteItems = new TableWriteItems(table); + + for (FlowFile flowFile : flowFiles) { + final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile); + final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile); + + if (!isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile)) { + continue; + } + + if (!isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, session, flowFile)) { + continue; + } + + if (!isDataValid(flowFile, jsonDocument)) { + flowFile = session.putAttribute(flowFile, AWS_DYNAMO_DB_ITEM_SIZE_ERROR, "Max size of item + attribute should be 400kb but was " + flowFile.getSize() + jsonDocument.length()); + session.transfer(flowFile, REL_FAILURE); + continue; + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + session.exportTo(flowFile, baos); + + try { + if (rangeKeyValue == null || StringUtils.isBlank(rangeKeyValue.toString())) { + tableWriteItems.addItemToPut(new Item().withKeyComponent(hashKeyName, hashKeyValue) + .withJSON(jsonDocument, IOUtils.toString(baos.toByteArray(), charset))); + } else { + tableWriteItems.addItemToPut(new Item().withKeyComponent(hashKeyName, hashKeyValue) + .withKeyComponent(rangeKeyName, rangeKeyValue) + .withJSON(jsonDocument, IOUtils.toString(baos.toByteArray(), charset))); + } + } catch (IOException ioe) { + getLogger().error("IOException while creating put item : " + ioe.getMessage()); + flowFile = session.putAttribute(flowFile, DYNAMODB_ITEM_IO_ERROR, ioe.getMessage()); + session.transfer(flowFile, REL_FAILURE); + } + keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), flowFile); + } + + if (keysToFlowFileMap.isEmpty()) { + return; + } + + final DynamoDB dynamoDB = getDynamoDB(); + + try { + BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(tableWriteItems); + + handleUnprocessedItems(session, keysToFlowFileMap, table, hashKeyName, hashKeyValueType, rangeKeyName, + rangeKeyValueType, outcome); + + // Handle any remaining flowfiles + for (FlowFile flowFile : keysToFlowFileMap.values()) { + getLogger().debug("Successful posted items to dynamodb : " + table); + session.transfer(flowFile, REL_SUCCESS); + } + } catch (AmazonServiceException exception) { + getLogger().error("Could not process flowFiles due to service exception : " + exception.getMessage()); + List failedFlowFiles = processServiceException(session, flowFiles, exception); + session.transfer(failedFlowFiles, REL_FAILURE); + } catch (AmazonClientException exception) { + getLogger().error("Could not process flowFiles due to client exception : " + exception.getMessage()); + List failedFlowFiles = processClientException(session, flowFiles, exception); + session.transfer(failedFlowFiles, REL_FAILURE); + } catch (Exception exception) { + getLogger().error("Could not process flowFiles due to exception : " + exception.getMessage()); + List failedFlowFiles = processException(session, flowFiles, exception); + session.transfer(failedFlowFiles, REL_FAILURE); + } + } + + private boolean isDataValid(FlowFile flowFile, String jsonDocument) { + return (flowFile.getSize() + jsonDocument.length()) < DYNAMODB_MAX_ITEM_SIZE; + } + + /** + * {@inheritDoc} + */ + protected Map getRequestItem(WriteRequest writeRequest) { + return writeRequest.getPutRequest().getItem(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 2d5460f..9eb1c7b 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -19,5 +19,8 @@ org.apache.nifi.processors.aws.sns.PutSNS org.apache.nifi.processors.aws.sqs.GetSQS org.apache.nifi.processors.aws.sqs.PutSQS org.apache.nifi.processors.aws.sqs.DeleteSQS -org.apache.nifi.processors.aws.lambda.PutLambda +org.apache.nifi.processors.aws.lambda.PutLambda org.apache.nifi.processors.aws.kinesis.firehose.PutKinesisFirehose +org.apache.nifi.processors.aws.dynamodb.GetDynamoDB +org.apache.nifi.processors.aws.dynamodb.PutDynamoDB +org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDBTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDBTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDBTest.java new file mode 100644 index 0000000..fa0e605 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDBTest.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.dynamodb; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.REGION; +import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.stringHashStringRangeTableName; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.TableWriteItems; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult; +import com.amazonaws.services.dynamodbv2.model.DeleteRequest; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +public class DeleteDynamoDBTest { + + protected DeleteDynamoDB deleteDynamoDB; + protected BatchWriteItemResult result = new BatchWriteItemResult(); + BatchWriteItemOutcome outcome; + + @Before + public void setUp() { + outcome = new BatchWriteItemOutcome(result); + result.setUnprocessedItems(new HashMap>()); + final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) { + @Override + public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) { + return outcome; + } + }; + + deleteDynamoDB = new DeleteDynamoDB() { + @Override + protected DynamoDB getDynamoDB() { + return mockDynamoDB; + } + }; + + } + + @Test + public void testStringHashStringRangeDeleteOnlyHashFailure() { + final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class); + + deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + deleteRunner.enqueue(new byte[] {}); + + deleteRunner.run(1); + + deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + + List flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + ITAbstractDynamoDBTest.validateServiceExceptionAttribute(flowFile); + } + + } + + @Test + public void testStringHashStringRangeDeleteSuccessfulWithMock() { + final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB); + + deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + deleteRunner.enqueue(new byte[] {}); + + deleteRunner.run(1); + + deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1); + + } + + @Test + public void testStringHashStringRangeDeleteSuccessfulWithMockOneUnprocessed() { + Map> unprocessed = + new HashMap>(); + DeleteRequest delete = new DeleteRequest(); + delete.addKeyEntry("hashS", new AttributeValue("h1")); + delete.addKeyEntry("rangeS", new AttributeValue("r1")); + WriteRequest write = new WriteRequest(delete); + List writes = new ArrayList<>(); + writes.add(write); + unprocessed.put(stringHashStringRangeTableName, writes); + result.setUnprocessedItems(unprocessed); + final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB); + + deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + deleteRunner.enqueue(new byte[] {}); + + deleteRunner.run(1); + + deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1); + + } + + @Test + public void testStringHashStringRangeDeleteNoHashValueFailure() { + final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class); + + deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + deleteRunner.enqueue(new byte[] {}); + + deleteRunner.run(1); + + deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + + List flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_HASH_KEY_VALUE_ERROR)); + } + + } + + @Test + public void testStringHashStringRangeDeleteOnlyHashWithRangeValueNoRangeNameFailure() { + final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class); + + deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + deleteRunner.enqueue(new byte[] {}); + + deleteRunner.run(1); + + deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + + List flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR)); + } + + } + + @Test + public void testStringHashStringRangeDeleteOnlyHashWithRangeNameNoRangeValueFailure() { + final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class); + + deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + deleteRunner.enqueue(new byte[] {}); + + deleteRunner.run(1); + + deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + + List flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR)); + } + } + + @Test + public void testStringHashStringRangeDeleteNonExistentHashSuccess() { + final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB); + + deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "nonexistent"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + deleteRunner.enqueue(new byte[] {}); + + deleteRunner.run(1); + + deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1); + + } + + @Test + public void testStringHashStringRangeDeleteNonExistentRangeSuccess() { + final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB); + + deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "nonexistent"); + deleteRunner.enqueue(new byte[] {}); + + deleteRunner.run(1); + + deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1); + + } + + @Test + public void testStringHashStringRangeDeleteThrowsServiceException() { + final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) { + @Override + public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) { + throw new AmazonServiceException("serviceException"); + } + }; + + deleteDynamoDB = new DeleteDynamoDB() { + @Override + protected DynamoDB getDynamoDB() { + return mockDynamoDB; + } + }; + final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB); + + deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + + deleteRunner.enqueue(new byte[] {}); + + deleteRunner.run(1); + + deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + List flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertEquals("serviceException (Service: null; Status Code: 0; Error Code: null; Request ID: null)", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE)); + } + + } + + @Test + public void testStringHashStringRangeDeleteThrowsClientException() { + final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) { + @Override + public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) { + throw new AmazonClientException("clientException"); + } + }; + + deleteDynamoDB = new DeleteDynamoDB() { + @Override + protected DynamoDB getDynamoDB() { + return mockDynamoDB; + } + }; + final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB); + + deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + + deleteRunner.enqueue(new byte[] {}); + + deleteRunner.run(1); + + deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + List flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertEquals("clientException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE)); + } + + } + + @Test + public void testStringHashStringRangeDeleteThrowsRuntimeException() { + final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) { + @Override + public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) { + throw new RuntimeException("runtimeException"); + } + }; + + deleteDynamoDB = new DeleteDynamoDB() { + @Override + protected DynamoDB getDynamoDB() { + return mockDynamoDB; + } + }; + final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB); + + deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + + deleteRunner.enqueue(new byte[] {}); + + deleteRunner.run(1); + + deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + List flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertEquals("runtimeException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE)); + } + + } +}